Compare commits

..

8 Commits

Author SHA1 Message Date
Thomas Roehl
ccce00d64f Add support for credential file (NKEY) to NATS sink and receiver 2024-10-11 04:43:57 +02:00
Thomas Roehl
a36f8fe19d Test whether ipmitool or ipmi-sensors can be executed without errors 2024-07-26 16:46:16 +02:00
Thomas Roehl
2efed7c631 Merge branch 'develop' of github.com:ClusterCockpit/cc-metric-collector into develop 2024-07-15 12:42:58 +02:00
Thomas Roehl
2affb4d8a7 Update sink's README 2024-07-15 12:42:51 +02:00
Thomas Gruber
55cb12c9f8 Update README.md
Use right JSON type in configuration
2024-07-15 12:41:07 +02:00
Thomas Roehl
b69efdc2a4 Update runonce action to use golang 1.22 stable release, no golang RPMs anymore 2024-06-17 14:28:17 +02:00
Thomas Roehl
caa04da163 Update Release action to use golang 1.22 stable release, no golang RPMs anymore 2024-06-17 14:11:33 +02:00
Thomas Gruber
0ae537fdc9 Update main config in configuration.md 2024-06-17 11:07:51 +02:00
12 changed files with 86 additions and 448 deletions

View File

@@ -44,16 +44,16 @@ jobs:
# Use dnf to install build dependencies # Use dnf to install build dependencies
- name: Install build dependencies - name: Install build dependencies
run: | run: |
dnf --assumeyes install \ wget -q https://go.dev/dl/go1.22.4.linux-amd64.tar.gz --output-document=- | \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \ tar --directory=/usr/local --extract --gzip
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-bin-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \ export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-src-1.21.7-1.module_el8+960+4060efbe.noarch.rpm \ go version
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/go-toolset-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm
- name: RPM build MetricCollector - name: RPM build MetricCollector
id: rpmbuild id: rpmbuild
run: | run: |
git config --global --add safe.directory /__w/cc-metric-collector/cc-metric-collector git config --global --add safe.directory /__w/cc-metric-collector/cc-metric-collector
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
make RPM make RPM
# AlmaLinux 8 is a derivate of RedHat Enterprise Linux 8 (UBI8), # AlmaLinux 8 is a derivate of RedHat Enterprise Linux 8 (UBI8),
@@ -114,16 +114,16 @@ jobs:
# Use dnf to install build dependencies # Use dnf to install build dependencies
- name: Install build dependencies - name: Install build dependencies
run: | run: |
dnf --assumeyes --disableplugin=subscription-manager install \ wget -q https://go.dev/dl/go1.22.4.linux-amd64.tar.gz --output-document=- | \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \ tar --directory=/usr/local --extract --gzip
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-bin-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \ export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-src-1.21.7-1.module_el8+960+4060efbe.noarch.rpm \ go version
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/go-toolset-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm
- name: RPM build MetricCollector - name: RPM build MetricCollector
id: rpmbuild id: rpmbuild
run: | run: |
git config --global --add safe.directory /__w/cc-metric-collector/cc-metric-collector git config --global --add safe.directory /__w/cc-metric-collector/cc-metric-collector
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
make RPM make RPM
# See: https://github.com/actions/upload-artifact # See: https://github.com/actions/upload-artifact
@@ -165,7 +165,7 @@ jobs:
# Use official golang package # Use official golang package
- name: Install Golang - name: Install Golang
run: | run: |
wget -q https://go.dev/dl/go1.21.1.linux-amd64.tar.gz --output-document=- | \ wget -q https://go.dev/dl/go1.22.4.linux-amd64.tar.gz --output-document=- | \
tar --directory=/usr/local --extract --gzip tar --directory=/usr/local --extract --gzip
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
go version go version

View File

@@ -91,16 +91,16 @@ jobs:
# Use dnf to install build dependencies # Use dnf to install build dependencies
- name: Install build dependencies - name: Install build dependencies
run: | run: |
dnf --assumeyes install \ wget -q https://go.dev/dl/go1.22.4.linux-amd64.tar.gz --output-document=- | \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \ tar --directory=/usr/local --extract --gzip
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-bin-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \ export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-src-1.21.7-1.module_el8+960+4060efbe.noarch.rpm \ go version
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/go-toolset-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm
- name: RPM build MetricCollector - name: RPM build MetricCollector
id: rpmbuild id: rpmbuild
run: | run: |
git config --global --add safe.directory /__w/cc-metric-collector/cc-metric-collector git config --global --add safe.directory /__w/cc-metric-collector/cc-metric-collector
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
make RPM make RPM
# #
@@ -129,16 +129,16 @@ jobs:
# Use dnf to install build dependencies # Use dnf to install build dependencies
- name: Install build dependencies - name: Install build dependencies
run: | run: |
dnf --assumeyes --disableplugin=subscription-manager install \ wget -q https://go.dev/dl/go1.22.4.linux-amd64.tar.gz --output-document=- | \
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \ tar --directory=/usr/local --extract --gzip
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-bin-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \ export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-src-1.21.7-1.module_el8+960+4060efbe.noarch.rpm \ go version
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/go-toolset-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm
- name: RPM build MetricCollector - name: RPM build MetricCollector
id: rpmbuild id: rpmbuild
run: | run: |
git config --global --add safe.directory /__w/cc-metric-collector/cc-metric-collector git config --global --add safe.directory /__w/cc-metric-collector/cc-metric-collector
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
make RPM make RPM
# #
@@ -165,7 +165,7 @@ jobs:
# Use official golang package # Use official golang package
- name: Install Golang - name: Install Golang
run: | run: |
wget -q https://go.dev/dl/go1.21.1.linux-amd64.tar.gz --output-document=- | \ wget -q https://go.dev/dl/go1.22.4.linux-amd64.tar.gz --output-document=- | \
tar --directory=/usr/local --extract --gzip tar --directory=/usr/local --extract --gzip
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
go version go version

View File

@@ -12,6 +12,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
) )
@@ -54,15 +55,30 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
// Check if executables ipmitool or ipmisensors are found // Check if executables ipmitool or ipmisensors are found
p, err := exec.LookPath(m.config.IpmitoolPath) p, err := exec.LookPath(m.config.IpmitoolPath)
if err == nil { if err == nil {
m.ipmitool = p command := exec.Command(p)
err := command.Run()
if err != nil {
cclog.ComponentError(m.name, fmt.Sprintf("Failed to execute %s: %v", p, err.Error()))
m.ipmitool = ""
} else {
m.ipmitool = p
}
} }
p, err = exec.LookPath(m.config.IpmisensorsPath) p, err = exec.LookPath(m.config.IpmisensorsPath)
if err == nil { if err == nil {
m.ipmisensors = p command := exec.Command(p)
err := command.Run()
if err != nil {
cclog.ComponentError(m.name, fmt.Sprintf("Failed to execute %s: %v", p, err.Error()))
m.ipmisensors = ""
} else {
m.ipmisensors = p
}
} }
if len(m.ipmitool) == 0 && len(m.ipmisensors) == 0 { if len(m.ipmitool) == 0 && len(m.ipmisensors) == 0 {
return errors.New("no IPMI reader found") return errors.New("no usable IPMI reader found")
} }
m.init = true m.init = true
return nil return nil
} }
@@ -119,8 +135,8 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMetric) {
cclog.ComponentError( cclog.ComponentError(
m.name, m.name,
fmt.Sprintf("readIpmiTool(): Failed to wait for the end of command \"%s\": %v\n", command.String(), err), fmt.Sprintf("readIpmiTool(): Failed to wait for the end of command \"%s\": %v\n", command.String(), err),
fmt.Sprintf("readIpmiTool(): command stderr: \"%s\"\n", string(errMsg)),
) )
cclog.ComponentError(m.name, fmt.Sprintf("readIpmiTool(): command stderr: \"%s\"\n", strings.TrimSpace(string(errMsg))))
return return
} }
} }

View File

@@ -12,8 +12,8 @@ The global file contains the paths to the other four files and some global optio
"collectors" : "collectors.json", "collectors" : "collectors.json",
"receivers" : "receivers.json", "receivers" : "receivers.json",
"router" : "router.json", "router" : "router.json",
"interval": 10, "interval": "10s",
"duration": 1 "duration": "1s"
} }
``` ```

View File

@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"os"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
@@ -13,10 +14,13 @@ import (
) )
type NatsReceiverConfig struct { type NatsReceiverConfig struct {
Type string `json:"type"` Type string `json:"type"`
Addr string `json:"address"` Addr string `json:"address"`
Port string `json:"port"` Port string `json:"port"`
Subject string `json:"subject"` Subject string `json:"subject"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
NkeyFile string `json:"nkey_file,omitempty"`
} }
type NatsReceiver struct { type NatsReceiver struct {
@@ -109,6 +113,7 @@ func (r *NatsReceiver) Close() {
// NewNatsReceiver creates a new Receiver which subscribes to messages from a NATS server // NewNatsReceiver creates a new Receiver which subscribes to messages from a NATS server
func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) { func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) {
var uinfo nats.Option = nil
r := new(NatsReceiver) r := new(NatsReceiver)
r.name = fmt.Sprintf("NatsReceiver(%s)", name) r.name = fmt.Sprintf("NatsReceiver(%s)", name)
@@ -133,10 +138,22 @@ func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) {
"source": r.name, "source": r.name,
} }
if len(r.config.User) > 0 && len(r.config.Password) > 0 {
uinfo = nats.UserInfo(r.config.User, r.config.Password)
} else if len(r.config.NkeyFile) > 0 {
_, err := os.Stat(r.config.NkeyFile)
if err == nil {
uinfo = nats.UserCredentials(r.config.NkeyFile)
} else {
cclog.ComponentError(r.name, "NKEY file", r.config.NkeyFile, "does not exist: %v", err.Error())
return nil, err
}
}
// Connect to NATS server // Connect to NATS server
url := fmt.Sprintf("nats://%s:%s", r.config.Addr, r.config.Port) url := fmt.Sprintf("nats://%s:%s", r.config.Addr, r.config.Port)
cclog.ComponentDebug(r.name, "NewNatsReceiver", url, "Subject", r.config.Subject) cclog.ComponentDebug(r.name, "NewNatsReceiver", url, "Subject", r.config.Subject)
if nc, err := nats.Connect(url); err == nil { if nc, err := nats.Connect(url, uinfo); err == nil {
r.nc = nc r.nc = nc
} else { } else {
r.nc = nil r.nc = nil

View File

@@ -10,7 +10,10 @@ The `nats` receiver can be used receive metrics from the NATS network. The `nats
"type": "nats", "type": "nats",
"address" : "nats-server.example.org", "address" : "nats-server.example.org",
"port" : "4222", "port" : "4222",
"subject" : "subject" "subject" : "subject",
"user": "natsuser",
"password": "natssecret",
"nkey_file": "/path/to/nkey_file"
} }
} }
``` ```
@@ -19,6 +22,9 @@ The `nats` receiver can be used receive metrics from the NATS network. The `nats
- `address`: Address of the NATS control server - `address`: Address of the NATS control server
- `port`: Port of the NATS control server - `port`: Port of the NATS control server
- `subject`: Subscribes to this subject and receive metrics - `subject`: Subscribes to this subject and receive metrics
- `user`: Connect to nats using this user
- `password`: Connect to nats using this password
- `nkey_file`: Path to credentials file with NKEY
### Debugging ### Debugging

View File

@@ -17,7 +17,7 @@ This folder contains the SinkManager and sink implementations for the cc-metric-
The configuration file for the sinks is a list of configurations. The `type` field in each specifies which sink to initialize. The configuration file for the sinks is a list of configurations. The `type` field in each specifies which sink to initialize.
```json ```json
[ {
"mystdout" : { "mystdout" : {
"type" : "stdout", "type" : "stdout",
"meta_as_tags" : [ "meta_as_tags" : [
@@ -31,7 +31,7 @@ The configuration file for the sinks is a list of configurations. The `type` fie
"database" : "ccmetric", "database" : "ccmetric",
"password" : "<jwt token>" "password" : "<jwt token>"
} }
] }
``` ```

View File

@@ -1,372 +0,0 @@
package sinks
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"net"
"os"
"sync"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
"github.com/go-mqtt/mqtt"
influx "github.com/influxdata/line-protocol/v2/lineprotocol"
"golang.org/x/exp/slices"
)
type MqttSinkConfig struct {
// defines JSON tags for 'type' and 'meta_as_tags' (string list)
// See: metricSink.go
defaultSinkConfig
// Additional config options, for MqttSink
ClientID string `json:"client_id"`
PersistenceDirectory string `json:"persistence_directory,omitempty"`
// Maximum number of points sent to server in single request.
// Default: 1000
BatchSize int `json:"batch_size,omitempty"`
// Time interval for delayed sending of metrics.
// If the buffers are already filled before the end of this interval,
// the metrics are sent without further delay.
// Default: 1s
FlushInterval string `json:"flush_delay,omitempty"`
flushDelay time.Duration
DialProtocol string `json:"dial_protocol"`
Hostname string `json:"hostname"`
Port int `json:"port"`
PauseTimeout string `json:"pause_timeout"`
pauseTimeout time.Duration
KeepAlive uint16 `json:"keep_alive_seconds"`
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
}
type MqttSink struct {
// declares elements 'name' and 'meta_as_tags' (string to bool map!)
sink
config MqttSinkConfig // entry point to the MqttSinkConfig
// influx line protocol encoder
encoder influx.Encoder
// number of records stored in the encoder
numRecordsInEncoder int
// List of tags and meta data tags which should be used as tags
extended_tag_list []key_value_pair
// Flush() runs in another goroutine and accesses the influx line protocol encoder,
// so this encoderLock has to protect the encoder and numRecordsInEncoder
encoderLock sync.Mutex
// timer to run Flush()
flushTimer *time.Timer
// Lock to assure that only one timer is running at a time
timerLock sync.Mutex
// WaitGroup to ensure only one send operation is running at a time
sendWaitGroup sync.WaitGroup
client *mqtt.Client
mqttconfig mqtt.Config
}
// Implement functions required for Sink interface
// Write(...), Flush(), Close()
// See: metricSink.go
// Code to submit a single CCMetric to the sink
func (s *MqttSink) Write(m lp.CCMetric) error {
// Lock for encoder usage
s.encoderLock.Lock()
// Encode measurement name
s.encoder.StartLine(m.Name())
// copy tags and meta data which should be used as tags
s.extended_tag_list = s.extended_tag_list[:0]
for key, value := range m.Tags() {
s.extended_tag_list =
append(
s.extended_tag_list,
key_value_pair{
key: key,
value: value,
},
)
}
for _, key := range s.config.MetaAsTags {
if value, ok := m.GetMeta(key); ok {
s.extended_tag_list =
append(
s.extended_tag_list,
key_value_pair{
key: key,
value: value,
},
)
}
}
// Encode tags (they musts be in lexical order)
slices.SortFunc(
s.extended_tag_list,
func(a key_value_pair, b key_value_pair) int {
if a.key < b.key {
return -1
}
if a.key > b.key {
return +1
}
return 0
},
)
for i := range s.extended_tag_list {
s.encoder.AddTag(
s.extended_tag_list[i].key,
s.extended_tag_list[i].value,
)
}
// Encode fields
for key, value := range m.Fields() {
s.encoder.AddField(key, influx.MustNewValue(value))
}
// Encode time stamp
s.encoder.EndLine(m.Time())
// Check for encoder errors
if err := s.encoder.Err(); err != nil {
// Unlock encoder usage
s.encoderLock.Unlock()
return fmt.Errorf("encoding failed: %v", err)
}
s.numRecordsInEncoder++
if s.config.flushDelay == 0 {
// Unlock encoder usage
s.encoderLock.Unlock()
// Directly flush if no flush delay is configured
return s.Flush()
} else if s.numRecordsInEncoder == s.config.BatchSize {
// Unlock encoder usage
s.encoderLock.Unlock()
// Stop flush timer
if s.flushTimer != nil {
if ok := s.flushTimer.Stop(); ok {
cclog.ComponentDebug(s.name, "Write(): Stopped flush timer. Batch size limit reached before flush delay")
s.timerLock.Unlock()
}
}
// Flush if batch size is reached
return s.Flush()
} else if s.timerLock.TryLock() {
// Setup flush timer when flush delay is configured
// and no other timer is already running
if s.flushTimer != nil {
// Restarting existing flush timer
cclog.ComponentDebug(s.name, "Write(): Restarting flush timer")
s.flushTimer.Reset(s.config.flushDelay)
} else {
// Creating and starting flush timer
cclog.ComponentDebug(s.name, "Write(): Starting new flush timer")
s.flushTimer = time.AfterFunc(
s.config.flushDelay,
func() {
defer s.timerLock.Unlock()
cclog.ComponentDebug(s.name, "Starting flush triggered by flush timer")
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "Flush triggered by flush timer: flush failed:", err)
}
})
}
}
// Unlock encoder usage
s.encoderLock.Unlock()
return nil
}
// If the sink uses batched sends internally, you can tell to flush its buffers
func (s *MqttSink) Flush() error {
// Lock for encoder usage
// Own lock for as short as possible: the time it takes to clone the buffer.
s.encoderLock.Lock()
buf := slices.Clone(s.encoder.Bytes())
numRecordsInBuf := s.numRecordsInEncoder
s.encoder.Reset()
s.numRecordsInEncoder = 0
// Unlock encoder usage
s.encoderLock.Unlock()
if len(buf) == 0 {
return nil
}
cclog.ComponentDebug(s.name, "Flush(): Flushing", numRecordsInBuf, "metrics")
// Asynchron send of encoder metrics
s.sendWaitGroup.Add(1)
go func() {
defer s.sendWaitGroup.Done()
//startTime := time.Now()
for {
exchange, err := s.client.PublishAtLeastOnce(buf, s.config.ClientID)
switch {
case err == nil:
return
case mqtt.IsDeny(err), errors.Is(err, mqtt.ErrClosed):
return
case errors.Is(err, mqtt.ErrMax):
time.Sleep(s.config.pauseTimeout)
default:
time.Sleep(s.config.pauseTimeout)
continue
}
for err := range exchange {
if errors.Is(err, mqtt.ErrClosed) {
return
}
}
return
}
}()
return nil
}
// Close sink: close network connection, close files, close libraries, ...
func (s *MqttSink) Close() {
cclog.ComponentDebug(s.name, "CLOSE")
// Stop existing timer and immediately flush
if s.flushTimer != nil {
if ok := s.flushTimer.Stop(); ok {
s.timerLock.Unlock()
}
}
// Flush
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "Close():", "Flush failed:", err)
}
// Wait for send operations to finish
s.sendWaitGroup.Wait()
s.client.Close()
s.client = nil
}
// New function to create a new instance of the sink
// Initialize the sink by giving it a name and reading in the config JSON
func NewMqttSink(name string, config json.RawMessage) (Sink, error) {
s := new(MqttSink)
// Set name of sampleSink
// The name should be chosen in such a way that different instances of MqttSink can be distinguished
s.name = fmt.Sprintf("MqttSink(%s)", name) // Always specify a name here
// Set defaults in s.config
// Allow overwriting these defaults by reading config JSON
s.config.PauseTimeout = "4s"
s.config.pauseTimeout = time.Duration(4) * time.Second
s.config.DialProtocol = "tcp"
s.config.Hostname = "localhost"
s.config.Port = 1883
// Read in the config JSON
if len(config) > 0 {
d := json.NewDecoder(bytes.NewReader(config))
d.DisallowUnknownFields()
if err := d.Decode(&s.config); err != nil {
cclog.ComponentError(s.name, "Error reading config:", err.Error())
return nil, err
}
}
// Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags {
s.meta_as_tags[k] = true
}
// Check if all required fields in the config are set
// E.g. use 'len(s.config.Option) > 0' for string settings
if t, err := time.ParseDuration(s.config.PauseTimeout); err == nil {
s.config.pauseTimeout = t
} else {
err := fmt.Errorf("to parse duration for PauseTimeout: %s", s.config.PauseTimeout)
cclog.ComponentError(s.name, err.Error())
return nil, err
}
if t, err := time.ParseDuration(s.config.FlushInterval); err == nil {
s.config.flushDelay = t
} else {
err := fmt.Errorf("to parse duration for FlushInterval: %s", s.config.FlushInterval)
cclog.ComponentError(s.name, err.Error())
return nil, err
}
switch s.config.DialProtocol {
case "tcp", "udp":
default:
err := errors.New("failed to parse dial protocol, allowed: tcp, udp")
cclog.ComponentError(s.name, err.Error())
return nil, err
}
var persistence mqtt.Persistence
if len(s.config.PersistenceDirectory) > 0 {
persistence = mqtt.FileSystem(s.config.PersistenceDirectory)
} else {
tmpdir, err := os.MkdirTemp("", "mqtt")
if err == nil {
persistence = mqtt.FileSystem(tmpdir)
}
}
// Establish connection to the server, library, ...
// Check required files exist and lookup path(s) of executable(s)
dialer := mqtt.NewDialer(s.config.DialProtocol, net.JoinHostPort(s.config.Hostname, fmt.Sprintf("%d", s.config.Port)))
s.mqttconfig = mqtt.Config{
Dialer: dialer,
PauseTimeout: s.config.pauseTimeout,
KeepAlive: uint16(s.config.KeepAlive),
}
if len(s.config.Username) > 0 {
s.mqttconfig.UserName = s.config.Username
}
if len(s.config.Password) > 0 {
s.mqttconfig.Password = []byte(s.config.Password)
}
client, err := mqtt.InitSession(s.config.ClientID, persistence, &s.mqttconfig)
if err != nil {
return nil, err
}
s.client = client
// Return (nil, meaningful error message) in case of errors
return s, nil
}

View File

@@ -1,39 +0,0 @@
## `mqtt` sink
The `mqtt` sink publishes all metrics into a MQTT network.
### Configuration structure
```json
{
"<name>": {
"type": "mqtt",
"client_id" : "myid",
"persistence_directory": "/tmp",
"batch_size": 1000,
"flush_delay": "1s",
"dial_protocol": "tcp",
"host": "dbhost.example.com",
"port": 1883,
"user": "exampleuser",
"password" : "examplepw",
"pause_timeout": "1s",
"keep_alive_seconds": 10,
"meta_as_tags" : [],
}
}
```
- `type`: makes the sink an `mqtt` sink
- `client_id`: MQTT clients use a client_id to talk to the broker
- `persistence_directory`: MQTT stores messages temporarly on disc if the broker is not available. Folder needs to be writable (default: `/tmp`)
- `pause_timeout`: Waittime when published failed
- `keep_alive_seconds`: Keep the connection alive for some time. Recommended to be longer than global `interval`.
- `flush_delay`: Group metrics coming in to a single batch
- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay
- `dial_protocol`: Use `tcp` or `udp` for the MQTT communication
- `host`: Hostname of the MQTT broker
- `port`: Port number of the MQTT broker
- `user`: Username for authentication
- `password`: Password for authentication
- `meta_as_tags`: print all meta information as tags in the output (optional)

View File

@@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"os"
"sync" "sync"
"time" "time"
@@ -22,6 +23,7 @@ type NatsSinkConfig struct {
User string `json:"user,omitempty"` User string `json:"user,omitempty"`
Password string `json:"password,omitempty"` Password string `json:"password,omitempty"`
FlushDelay string `json:"flush_delay,omitempty"` FlushDelay string `json:"flush_delay,omitempty"`
NkeyFile string `json:"nkey_file,omitempty"`
} }
type NatsSink struct { type NatsSink struct {
@@ -42,6 +44,13 @@ func (s *NatsSink) connect() error {
var nc *nats.Conn var nc *nats.Conn
if len(s.config.User) > 0 && len(s.config.Password) > 0 { if len(s.config.User) > 0 && len(s.config.Password) > 0 {
uinfo = nats.UserInfo(s.config.User, s.config.Password) uinfo = nats.UserInfo(s.config.User, s.config.Password)
} else if len(s.config.NkeyFile) > 0 {
if _, err := os.Stat(s.config.NkeyFile); err == nil {
uinfo = nats.UserCredentials(s.config.NkeyFile)
} else {
cclog.ComponentError(s.name, "NKEY file", s.config.NkeyFile, "does not exist: %v", err.Error())
return err
}
} }
uri := fmt.Sprintf("nats://%s:%s", s.config.Host, s.config.Port) uri := fmt.Sprintf("nats://%s:%s", s.config.Host, s.config.Port)
cclog.ComponentDebug(s.name, "Connect to", uri) cclog.ComponentDebug(s.name, "Connect to", uri)

View File

@@ -13,6 +13,7 @@ The `nats` sink publishes all metrics into a NATS network. The publishing key is
"port": "4222", "port": "4222",
"user": "exampleuser", "user": "exampleuser",
"password" : "examplepw", "password" : "examplepw",
"nkey_file": "/path/to/nkey_file",
"meta_as_tags" : [], "meta_as_tags" : [],
} }
} }
@@ -25,3 +26,4 @@ The `nats` sink publishes all metrics into a NATS network. The publishing key is
- `user`: Username for basic authentication - `user`: Username for basic authentication
- `password`: Password for basic authentication - `password`: Password for basic authentication
- `meta_as_tags`: print all meta information as tags in the output (optional) - `meta_as_tags`: print all meta information as tags in the output (optional)
- `nkey_file`: Path to credentials file with NKEY

View File

@@ -21,7 +21,6 @@ var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink,
"influxdb": NewInfluxSink, "influxdb": NewInfluxSink,
"influxasync": NewInfluxAsyncSink, "influxasync": NewInfluxAsyncSink,
"http": NewHttpSink, "http": NewHttpSink,
"mqtt": NewMqttSink,
} }
// Metric collector manager data structure // Metric collector manager data structure