mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-07-19 19:31:41 +02:00
Compare commits
2 Commits
mqtt_sink
...
likwid_col
Author | SHA1 | Date | |
---|---|---|---|
|
9ca73a9f50 | ||
|
0186dce521 |
@@ -374,6 +374,14 @@ func (m *LikwidCollector) takeMeasurement(evidx int, evset LikwidEventsetConfig,
|
||||
}
|
||||
defer watcher.Close()
|
||||
if len(m.config.LockfilePath) > 0 {
|
||||
if _, err := os.Stat(m.config.LockfilePath); os.IsNotExist(err) {
|
||||
file, err := os.Create(m.config.LockfilePath)
|
||||
if err != nil {
|
||||
cclog.ComponentError(m.name, "Cannot create lockfile", m.config.LockfilePath, ":", err.Error())
|
||||
return true, err
|
||||
}
|
||||
file.Close()
|
||||
}
|
||||
info, err := os.Stat(m.config.LockfilePath)
|
||||
if err != nil {
|
||||
return true, err
|
||||
@@ -382,9 +390,9 @@ func (m *LikwidCollector) takeMeasurement(evidx int, evset LikwidEventsetConfig,
|
||||
if uid != uint32(os.Getuid()) {
|
||||
usr, err := user.LookupId(fmt.Sprint(uid))
|
||||
if err == nil {
|
||||
return true, fmt.Errorf("Access to performance counters locked by %s", usr.Username)
|
||||
return true, fmt.Errorf("access to performance counters locked by %s", usr.Username)
|
||||
} else {
|
||||
return true, fmt.Errorf("Access to performance counters locked by %d", uid)
|
||||
return true, fmt.Errorf("access to performance counters locked by %d", uid)
|
||||
}
|
||||
}
|
||||
err = watcher.Add(m.config.LockfilePath)
|
||||
|
@@ -267,3 +267,45 @@ IPC PMC0/PMC1 -> {
|
||||
```
|
||||
|
||||
The script `scripts/likwid_perfgroup_to_cc_config.py` might help you.
|
||||
|
||||
### Internal structure
|
||||
|
||||
This section describes the internal structure of the `likwid` collector.
|
||||
|
||||
#### At initialization
|
||||
|
||||
After setting the defaults, the configuration is read.
|
||||
|
||||
Based on the configuration, the library is searched using `dlopen` to see whether it makes sense to proceed.
|
||||
|
||||
Next, the user-given metrics are tested to ensure they can be evaluated. For this, it creates a list of all user-given events/counters with the value `1.0` which is provided to the metric evaluator. The same is done for the global metrics by using the metric names with value `1.0`. If the evaluator does not fail, the metric can be evaluated and the collector initialization can proceed.
|
||||
|
||||
A separate thread is started to do the measurement. This is not done using a common goroutine but a real application thread with full control. This is required because LIKWID's access system tracks the processes of the using application and the PID should not change between measurements because that would require teardown and reopening of the access system.
|
||||
|
||||
With the separate thread, the access system is initialized by setting the user-given access mode and adding all hardware threads.
|
||||
|
||||
LIKWID measures per hardware thread in general but only some HW threads read the counters available only e.g. per CPU socket (often memory traffic). For this, the collector gets the system topology through LIKWID and creates different mappings like 'hwthread to list offset' and others. With this, the hardware threads responsible for a topological entity can be determined because those read the counters of the per CPU socket units. These mappings are later used in the measurement phase.
|
||||
|
||||
In the end, we read the base CPU frequency of the system. It may be used in the metric evaluation.
|
||||
|
||||
#### Measurements
|
||||
|
||||
The reading of events is done by the separate application thread.
|
||||
|
||||
It traverses over all configured event sets, creates valid LIKWID eventstrings out of them and pass them to take a measurement. This could be done only once but when the LIKWID lock changes, LIKWID has to be completely reopened to provide access again. With this reopening, the already added event sets are gone.
|
||||
|
||||
LIKWID has it's own locking mechanism using a lock file. But not the content of the file is of interest but the owner. In order to track changes of the file, a `fsnotify` watcher is installed on the file. If the file does not exist, it is created and consequently is owned by the same user as `cc-metric-collector`. The LikwidCollector has to watch the file on it's own because LIKWID does not provide proper error handling for this.
|
||||
|
||||
Each call to the LIKWID library for loading the event set, setting up the counting facilities as well as starting and stopping of the counters is wrapped into lockfile checks to ensure no state change happens. If the file owner changed, the LikwidCollector cannot access the counters anymore, so no further operation can be done and measurment stops.
|
||||
|
||||
Although start/stop would be sufficient, the LikwidCollector performs start, read, wait, read, `getLastResult`, stop. Reason might be "historic" but is not 100% clear anymore. The author failed to document ;)
|
||||
|
||||
#### Metric evaluation
|
||||
|
||||
After each meaurement, the metrics of the event set are directly evaluated. It updates the counter->result mapping with the new measurements, calls the evaluator and generates the `CCMetric` with the user-given settings if it should be published. Each metric name to result calculation is stored for the global metric evaluation, which is done as a final step.
|
||||
|
||||
#### Shutdown
|
||||
|
||||
Since each measurment involves a complete initialize to finalize cycle of the LIKWID library, only the topology module needs to be closed.
|
||||
|
||||
Moreover, the separate application thread is stopped.
|
@@ -1,372 +0,0 @@
|
||||
package sinks
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||
"github.com/go-mqtt/mqtt"
|
||||
influx "github.com/influxdata/line-protocol/v2/lineprotocol"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
type MqttSinkConfig struct {
|
||||
// defines JSON tags for 'type' and 'meta_as_tags' (string list)
|
||||
// See: metricSink.go
|
||||
defaultSinkConfig
|
||||
// Additional config options, for MqttSink
|
||||
ClientID string `json:"client_id"`
|
||||
PersistenceDirectory string `json:"persistence_directory,omitempty"`
|
||||
// Maximum number of points sent to server in single request.
|
||||
// Default: 1000
|
||||
BatchSize int `json:"batch_size,omitempty"`
|
||||
|
||||
// Time interval for delayed sending of metrics.
|
||||
// If the buffers are already filled before the end of this interval,
|
||||
// the metrics are sent without further delay.
|
||||
// Default: 1s
|
||||
FlushInterval string `json:"flush_delay,omitempty"`
|
||||
flushDelay time.Duration
|
||||
|
||||
DialProtocol string `json:"dial_protocol"`
|
||||
Hostname string `json:"hostname"`
|
||||
Port int `json:"port"`
|
||||
PauseTimeout string `json:"pause_timeout"`
|
||||
pauseTimeout time.Duration
|
||||
KeepAlive uint16 `json:"keep_alive_seconds"`
|
||||
Username string `json:"username,omitempty"`
|
||||
Password string `json:"password,omitempty"`
|
||||
}
|
||||
|
||||
type MqttSink struct {
|
||||
// declares elements 'name' and 'meta_as_tags' (string to bool map!)
|
||||
sink
|
||||
config MqttSinkConfig // entry point to the MqttSinkConfig
|
||||
// influx line protocol encoder
|
||||
encoder influx.Encoder
|
||||
// number of records stored in the encoder
|
||||
numRecordsInEncoder int
|
||||
// List of tags and meta data tags which should be used as tags
|
||||
extended_tag_list []key_value_pair
|
||||
// Flush() runs in another goroutine and accesses the influx line protocol encoder,
|
||||
// so this encoderLock has to protect the encoder and numRecordsInEncoder
|
||||
encoderLock sync.Mutex
|
||||
|
||||
// timer to run Flush()
|
||||
flushTimer *time.Timer
|
||||
// Lock to assure that only one timer is running at a time
|
||||
timerLock sync.Mutex
|
||||
|
||||
// WaitGroup to ensure only one send operation is running at a time
|
||||
sendWaitGroup sync.WaitGroup
|
||||
|
||||
client *mqtt.Client
|
||||
mqttconfig mqtt.Config
|
||||
}
|
||||
|
||||
// Implement functions required for Sink interface
|
||||
// Write(...), Flush(), Close()
|
||||
// See: metricSink.go
|
||||
|
||||
// Code to submit a single CCMetric to the sink
|
||||
func (s *MqttSink) Write(m lp.CCMetric) error {
|
||||
|
||||
// Lock for encoder usage
|
||||
s.encoderLock.Lock()
|
||||
|
||||
// Encode measurement name
|
||||
s.encoder.StartLine(m.Name())
|
||||
|
||||
// copy tags and meta data which should be used as tags
|
||||
s.extended_tag_list = s.extended_tag_list[:0]
|
||||
for key, value := range m.Tags() {
|
||||
s.extended_tag_list =
|
||||
append(
|
||||
s.extended_tag_list,
|
||||
key_value_pair{
|
||||
key: key,
|
||||
value: value,
|
||||
},
|
||||
)
|
||||
}
|
||||
for _, key := range s.config.MetaAsTags {
|
||||
if value, ok := m.GetMeta(key); ok {
|
||||
s.extended_tag_list =
|
||||
append(
|
||||
s.extended_tag_list,
|
||||
key_value_pair{
|
||||
key: key,
|
||||
value: value,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Encode tags (they musts be in lexical order)
|
||||
slices.SortFunc(
|
||||
s.extended_tag_list,
|
||||
func(a key_value_pair, b key_value_pair) int {
|
||||
if a.key < b.key {
|
||||
return -1
|
||||
}
|
||||
if a.key > b.key {
|
||||
return +1
|
||||
}
|
||||
return 0
|
||||
},
|
||||
)
|
||||
for i := range s.extended_tag_list {
|
||||
s.encoder.AddTag(
|
||||
s.extended_tag_list[i].key,
|
||||
s.extended_tag_list[i].value,
|
||||
)
|
||||
}
|
||||
|
||||
// Encode fields
|
||||
for key, value := range m.Fields() {
|
||||
s.encoder.AddField(key, influx.MustNewValue(value))
|
||||
}
|
||||
|
||||
// Encode time stamp
|
||||
s.encoder.EndLine(m.Time())
|
||||
|
||||
// Check for encoder errors
|
||||
if err := s.encoder.Err(); err != nil {
|
||||
// Unlock encoder usage
|
||||
s.encoderLock.Unlock()
|
||||
|
||||
return fmt.Errorf("encoding failed: %v", err)
|
||||
}
|
||||
s.numRecordsInEncoder++
|
||||
|
||||
if s.config.flushDelay == 0 {
|
||||
// Unlock encoder usage
|
||||
s.encoderLock.Unlock()
|
||||
|
||||
// Directly flush if no flush delay is configured
|
||||
return s.Flush()
|
||||
} else if s.numRecordsInEncoder == s.config.BatchSize {
|
||||
// Unlock encoder usage
|
||||
s.encoderLock.Unlock()
|
||||
|
||||
// Stop flush timer
|
||||
if s.flushTimer != nil {
|
||||
if ok := s.flushTimer.Stop(); ok {
|
||||
cclog.ComponentDebug(s.name, "Write(): Stopped flush timer. Batch size limit reached before flush delay")
|
||||
s.timerLock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Flush if batch size is reached
|
||||
return s.Flush()
|
||||
} else if s.timerLock.TryLock() {
|
||||
|
||||
// Setup flush timer when flush delay is configured
|
||||
// and no other timer is already running
|
||||
if s.flushTimer != nil {
|
||||
|
||||
// Restarting existing flush timer
|
||||
cclog.ComponentDebug(s.name, "Write(): Restarting flush timer")
|
||||
s.flushTimer.Reset(s.config.flushDelay)
|
||||
} else {
|
||||
|
||||
// Creating and starting flush timer
|
||||
cclog.ComponentDebug(s.name, "Write(): Starting new flush timer")
|
||||
s.flushTimer = time.AfterFunc(
|
||||
s.config.flushDelay,
|
||||
func() {
|
||||
defer s.timerLock.Unlock()
|
||||
cclog.ComponentDebug(s.name, "Starting flush triggered by flush timer")
|
||||
if err := s.Flush(); err != nil {
|
||||
cclog.ComponentError(s.name, "Flush triggered by flush timer: flush failed:", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Unlock encoder usage
|
||||
s.encoderLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// If the sink uses batched sends internally, you can tell to flush its buffers
|
||||
func (s *MqttSink) Flush() error {
|
||||
|
||||
// Lock for encoder usage
|
||||
// Own lock for as short as possible: the time it takes to clone the buffer.
|
||||
s.encoderLock.Lock()
|
||||
|
||||
buf := slices.Clone(s.encoder.Bytes())
|
||||
numRecordsInBuf := s.numRecordsInEncoder
|
||||
s.encoder.Reset()
|
||||
s.numRecordsInEncoder = 0
|
||||
|
||||
// Unlock encoder usage
|
||||
s.encoderLock.Unlock()
|
||||
|
||||
if len(buf) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
cclog.ComponentDebug(s.name, "Flush(): Flushing", numRecordsInBuf, "metrics")
|
||||
|
||||
// Asynchron send of encoder metrics
|
||||
s.sendWaitGroup.Add(1)
|
||||
go func() {
|
||||
defer s.sendWaitGroup.Done()
|
||||
//startTime := time.Now()
|
||||
for {
|
||||
exchange, err := s.client.PublishAtLeastOnce(buf, s.config.ClientID)
|
||||
switch {
|
||||
case err == nil:
|
||||
return
|
||||
|
||||
case mqtt.IsDeny(err), errors.Is(err, mqtt.ErrClosed):
|
||||
return
|
||||
|
||||
case errors.Is(err, mqtt.ErrMax):
|
||||
time.Sleep(s.config.pauseTimeout)
|
||||
|
||||
default:
|
||||
time.Sleep(s.config.pauseTimeout)
|
||||
continue
|
||||
}
|
||||
|
||||
for err := range exchange {
|
||||
if errors.Is(err, mqtt.ErrClosed) {
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
return
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close sink: close network connection, close files, close libraries, ...
|
||||
func (s *MqttSink) Close() {
|
||||
|
||||
cclog.ComponentDebug(s.name, "CLOSE")
|
||||
|
||||
// Stop existing timer and immediately flush
|
||||
if s.flushTimer != nil {
|
||||
if ok := s.flushTimer.Stop(); ok {
|
||||
s.timerLock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Flush
|
||||
if err := s.Flush(); err != nil {
|
||||
cclog.ComponentError(s.name, "Close():", "Flush failed:", err)
|
||||
}
|
||||
|
||||
// Wait for send operations to finish
|
||||
s.sendWaitGroup.Wait()
|
||||
|
||||
s.client.Close()
|
||||
s.client = nil
|
||||
}
|
||||
|
||||
// New function to create a new instance of the sink
|
||||
// Initialize the sink by giving it a name and reading in the config JSON
|
||||
func NewMqttSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(MqttSink)
|
||||
|
||||
// Set name of sampleSink
|
||||
// The name should be chosen in such a way that different instances of MqttSink can be distinguished
|
||||
s.name = fmt.Sprintf("MqttSink(%s)", name) // Always specify a name here
|
||||
|
||||
// Set defaults in s.config
|
||||
// Allow overwriting these defaults by reading config JSON
|
||||
|
||||
s.config.PauseTimeout = "4s"
|
||||
s.config.pauseTimeout = time.Duration(4) * time.Second
|
||||
s.config.DialProtocol = "tcp"
|
||||
s.config.Hostname = "localhost"
|
||||
s.config.Port = 1883
|
||||
|
||||
// Read in the config JSON
|
||||
if len(config) > 0 {
|
||||
d := json.NewDecoder(bytes.NewReader(config))
|
||||
d.DisallowUnknownFields()
|
||||
if err := d.Decode(&s.config); err != nil {
|
||||
cclog.ComponentError(s.name, "Error reading config:", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Create lookup map to use meta infos as tags in the output metric
|
||||
s.meta_as_tags = make(map[string]bool)
|
||||
for _, k := range s.config.MetaAsTags {
|
||||
s.meta_as_tags[k] = true
|
||||
}
|
||||
|
||||
// Check if all required fields in the config are set
|
||||
// E.g. use 'len(s.config.Option) > 0' for string settings
|
||||
if t, err := time.ParseDuration(s.config.PauseTimeout); err == nil {
|
||||
s.config.pauseTimeout = t
|
||||
} else {
|
||||
err := fmt.Errorf("to parse duration for PauseTimeout: %s", s.config.PauseTimeout)
|
||||
cclog.ComponentError(s.name, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
if t, err := time.ParseDuration(s.config.FlushInterval); err == nil {
|
||||
s.config.flushDelay = t
|
||||
} else {
|
||||
err := fmt.Errorf("to parse duration for FlushInterval: %s", s.config.FlushInterval)
|
||||
cclog.ComponentError(s.name, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch s.config.DialProtocol {
|
||||
case "tcp", "udp":
|
||||
default:
|
||||
err := errors.New("failed to parse dial protocol, allowed: tcp, udp")
|
||||
cclog.ComponentError(s.name, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var persistence mqtt.Persistence
|
||||
if len(s.config.PersistenceDirectory) > 0 {
|
||||
persistence = mqtt.FileSystem(s.config.PersistenceDirectory)
|
||||
} else {
|
||||
tmpdir, err := os.MkdirTemp("", "mqtt")
|
||||
if err == nil {
|
||||
persistence = mqtt.FileSystem(tmpdir)
|
||||
}
|
||||
}
|
||||
|
||||
// Establish connection to the server, library, ...
|
||||
// Check required files exist and lookup path(s) of executable(s)
|
||||
|
||||
dialer := mqtt.NewDialer(s.config.DialProtocol, net.JoinHostPort(s.config.Hostname, fmt.Sprintf("%d", s.config.Port)))
|
||||
|
||||
s.mqttconfig = mqtt.Config{
|
||||
Dialer: dialer,
|
||||
PauseTimeout: s.config.pauseTimeout,
|
||||
KeepAlive: uint16(s.config.KeepAlive),
|
||||
}
|
||||
if len(s.config.Username) > 0 {
|
||||
s.mqttconfig.UserName = s.config.Username
|
||||
}
|
||||
if len(s.config.Password) > 0 {
|
||||
s.mqttconfig.Password = []byte(s.config.Password)
|
||||
}
|
||||
|
||||
client, err := mqtt.InitSession(s.config.ClientID, persistence, &s.mqttconfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.client = client
|
||||
|
||||
// Return (nil, meaningful error message) in case of errors
|
||||
return s, nil
|
||||
}
|
@@ -1,39 +0,0 @@
|
||||
## `mqtt` sink
|
||||
|
||||
The `mqtt` sink publishes all metrics into a MQTT network.
|
||||
|
||||
### Configuration structure
|
||||
|
||||
```json
|
||||
{
|
||||
"<name>": {
|
||||
"type": "mqtt",
|
||||
"client_id" : "myid",
|
||||
"persistence_directory": "/tmp",
|
||||
"batch_size": 1000,
|
||||
"flush_delay": "1s",
|
||||
"dial_protocol": "tcp",
|
||||
"host": "dbhost.example.com",
|
||||
"port": 1883,
|
||||
"user": "exampleuser",
|
||||
"password" : "examplepw",
|
||||
"pause_timeout": "1s",
|
||||
"keep_alive_seconds": 10,
|
||||
"meta_as_tags" : [],
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- `type`: makes the sink an `mqtt` sink
|
||||
- `client_id`: MQTT clients use a client_id to talk to the broker
|
||||
- `persistence_directory`: MQTT stores messages temporarly on disc if the broker is not available. Folder needs to be writable (default: `/tmp`)
|
||||
- `pause_timeout`: Waittime when published failed
|
||||
- `keep_alive_seconds`: Keep the connection alive for some time. Recommended to be longer than global `interval`.
|
||||
- `flush_delay`: Group metrics coming in to a single batch
|
||||
- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay
|
||||
- `dial_protocol`: Use `tcp` or `udp` for the MQTT communication
|
||||
- `host`: Hostname of the MQTT broker
|
||||
- `port`: Port number of the MQTT broker
|
||||
- `user`: Username for authentication
|
||||
- `password`: Password for authentication
|
||||
- `meta_as_tags`: print all meta information as tags in the output (optional)
|
@@ -21,7 +21,6 @@ var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink,
|
||||
"influxdb": NewInfluxSink,
|
||||
"influxasync": NewInfluxAsyncSink,
|
||||
"http": NewHttpSink,
|
||||
"mqtt": NewMqttSink,
|
||||
}
|
||||
|
||||
// Metric collector manager data structure
|
||||
|
Reference in New Issue
Block a user