mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-07-19 19:31:41 +02:00
Compare commits
8 Commits
amqp_sink
...
nats_nkey_
Author | SHA1 | Date | |
---|---|---|---|
|
ccce00d64f | ||
|
a36f8fe19d | ||
|
2efed7c631 | ||
|
2affb4d8a7 | ||
|
55cb12c9f8 | ||
|
b69efdc2a4 | ||
|
caa04da163 | ||
|
0ae537fdc9 |
22
.github/workflows/Release.yml
vendored
22
.github/workflows/Release.yml
vendored
@@ -44,16 +44,16 @@ jobs:
|
|||||||
# Use dnf to install build dependencies
|
# Use dnf to install build dependencies
|
||||||
- name: Install build dependencies
|
- name: Install build dependencies
|
||||||
run: |
|
run: |
|
||||||
dnf --assumeyes install \
|
wget -q https://go.dev/dl/go1.22.4.linux-amd64.tar.gz --output-document=- | \
|
||||||
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \
|
tar --directory=/usr/local --extract --gzip
|
||||||
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-bin-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \
|
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
|
||||||
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-src-1.21.7-1.module_el8+960+4060efbe.noarch.rpm \
|
go version
|
||||||
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/go-toolset-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm
|
|
||||||
|
|
||||||
- name: RPM build MetricCollector
|
- name: RPM build MetricCollector
|
||||||
id: rpmbuild
|
id: rpmbuild
|
||||||
run: |
|
run: |
|
||||||
git config --global --add safe.directory /__w/cc-metric-collector/cc-metric-collector
|
git config --global --add safe.directory /__w/cc-metric-collector/cc-metric-collector
|
||||||
|
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
|
||||||
make RPM
|
make RPM
|
||||||
|
|
||||||
# AlmaLinux 8 is a derivate of RedHat Enterprise Linux 8 (UBI8),
|
# AlmaLinux 8 is a derivate of RedHat Enterprise Linux 8 (UBI8),
|
||||||
@@ -114,16 +114,16 @@ jobs:
|
|||||||
# Use dnf to install build dependencies
|
# Use dnf to install build dependencies
|
||||||
- name: Install build dependencies
|
- name: Install build dependencies
|
||||||
run: |
|
run: |
|
||||||
dnf --assumeyes --disableplugin=subscription-manager install \
|
wget -q https://go.dev/dl/go1.22.4.linux-amd64.tar.gz --output-document=- | \
|
||||||
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \
|
tar --directory=/usr/local --extract --gzip
|
||||||
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-bin-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \
|
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
|
||||||
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-src-1.21.7-1.module_el8+960+4060efbe.noarch.rpm \
|
go version
|
||||||
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/go-toolset-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm
|
|
||||||
|
|
||||||
- name: RPM build MetricCollector
|
- name: RPM build MetricCollector
|
||||||
id: rpmbuild
|
id: rpmbuild
|
||||||
run: |
|
run: |
|
||||||
git config --global --add safe.directory /__w/cc-metric-collector/cc-metric-collector
|
git config --global --add safe.directory /__w/cc-metric-collector/cc-metric-collector
|
||||||
|
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
|
||||||
make RPM
|
make RPM
|
||||||
|
|
||||||
# See: https://github.com/actions/upload-artifact
|
# See: https://github.com/actions/upload-artifact
|
||||||
@@ -165,7 +165,7 @@ jobs:
|
|||||||
# Use official golang package
|
# Use official golang package
|
||||||
- name: Install Golang
|
- name: Install Golang
|
||||||
run: |
|
run: |
|
||||||
wget -q https://go.dev/dl/go1.21.1.linux-amd64.tar.gz --output-document=- | \
|
wget -q https://go.dev/dl/go1.22.4.linux-amd64.tar.gz --output-document=- | \
|
||||||
tar --directory=/usr/local --extract --gzip
|
tar --directory=/usr/local --extract --gzip
|
||||||
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
|
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
|
||||||
go version
|
go version
|
||||||
|
22
.github/workflows/runonce.yml
vendored
22
.github/workflows/runonce.yml
vendored
@@ -91,16 +91,16 @@ jobs:
|
|||||||
# Use dnf to install build dependencies
|
# Use dnf to install build dependencies
|
||||||
- name: Install build dependencies
|
- name: Install build dependencies
|
||||||
run: |
|
run: |
|
||||||
dnf --assumeyes install \
|
wget -q https://go.dev/dl/go1.22.4.linux-amd64.tar.gz --output-document=- | \
|
||||||
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \
|
tar --directory=/usr/local --extract --gzip
|
||||||
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-bin-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \
|
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
|
||||||
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-src-1.21.7-1.module_el8+960+4060efbe.noarch.rpm \
|
go version
|
||||||
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/go-toolset-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm
|
|
||||||
|
|
||||||
- name: RPM build MetricCollector
|
- name: RPM build MetricCollector
|
||||||
id: rpmbuild
|
id: rpmbuild
|
||||||
run: |
|
run: |
|
||||||
git config --global --add safe.directory /__w/cc-metric-collector/cc-metric-collector
|
git config --global --add safe.directory /__w/cc-metric-collector/cc-metric-collector
|
||||||
|
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
|
||||||
make RPM
|
make RPM
|
||||||
|
|
||||||
#
|
#
|
||||||
@@ -129,16 +129,16 @@ jobs:
|
|||||||
# Use dnf to install build dependencies
|
# Use dnf to install build dependencies
|
||||||
- name: Install build dependencies
|
- name: Install build dependencies
|
||||||
run: |
|
run: |
|
||||||
dnf --assumeyes --disableplugin=subscription-manager install \
|
wget -q https://go.dev/dl/go1.22.4.linux-amd64.tar.gz --output-document=- | \
|
||||||
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \
|
tar --directory=/usr/local --extract --gzip
|
||||||
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-bin-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm \
|
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
|
||||||
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/golang-src-1.21.7-1.module_el8+960+4060efbe.noarch.rpm \
|
go version
|
||||||
http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/Packages/go-toolset-1.21.7-1.module_el8+960+4060efbe.x86_64.rpm
|
|
||||||
|
|
||||||
- name: RPM build MetricCollector
|
- name: RPM build MetricCollector
|
||||||
id: rpmbuild
|
id: rpmbuild
|
||||||
run: |
|
run: |
|
||||||
git config --global --add safe.directory /__w/cc-metric-collector/cc-metric-collector
|
git config --global --add safe.directory /__w/cc-metric-collector/cc-metric-collector
|
||||||
|
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
|
||||||
make RPM
|
make RPM
|
||||||
|
|
||||||
#
|
#
|
||||||
@@ -165,7 +165,7 @@ jobs:
|
|||||||
# Use official golang package
|
# Use official golang package
|
||||||
- name: Install Golang
|
- name: Install Golang
|
||||||
run: |
|
run: |
|
||||||
wget -q https://go.dev/dl/go1.21.1.linux-amd64.tar.gz --output-document=- | \
|
wget -q https://go.dev/dl/go1.22.4.linux-amd64.tar.gz --output-document=- | \
|
||||||
tar --directory=/usr/local --extract --gzip
|
tar --directory=/usr/local --extract --gzip
|
||||||
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
|
export PATH=/usr/local/go/bin:/usr/local/go/pkg/tool/linux_amd64:$PATH
|
||||||
go version
|
go version
|
||||||
|
@@ -12,6 +12,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||||
)
|
)
|
||||||
@@ -54,15 +55,30 @@ func (m *IpmiCollector) Init(config json.RawMessage) error {
|
|||||||
// Check if executables ipmitool or ipmisensors are found
|
// Check if executables ipmitool or ipmisensors are found
|
||||||
p, err := exec.LookPath(m.config.IpmitoolPath)
|
p, err := exec.LookPath(m.config.IpmitoolPath)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
m.ipmitool = p
|
command := exec.Command(p)
|
||||||
|
err := command.Run()
|
||||||
|
if err != nil {
|
||||||
|
cclog.ComponentError(m.name, fmt.Sprintf("Failed to execute %s: %v", p, err.Error()))
|
||||||
|
m.ipmitool = ""
|
||||||
|
} else {
|
||||||
|
m.ipmitool = p
|
||||||
|
}
|
||||||
}
|
}
|
||||||
p, err = exec.LookPath(m.config.IpmisensorsPath)
|
p, err = exec.LookPath(m.config.IpmisensorsPath)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
m.ipmisensors = p
|
command := exec.Command(p)
|
||||||
|
err := command.Run()
|
||||||
|
if err != nil {
|
||||||
|
cclog.ComponentError(m.name, fmt.Sprintf("Failed to execute %s: %v", p, err.Error()))
|
||||||
|
m.ipmisensors = ""
|
||||||
|
} else {
|
||||||
|
m.ipmisensors = p
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if len(m.ipmitool) == 0 && len(m.ipmisensors) == 0 {
|
if len(m.ipmitool) == 0 && len(m.ipmisensors) == 0 {
|
||||||
return errors.New("no IPMI reader found")
|
return errors.New("no usable IPMI reader found")
|
||||||
}
|
}
|
||||||
|
|
||||||
m.init = true
|
m.init = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -119,8 +135,8 @@ func (m *IpmiCollector) readIpmiTool(cmd string, output chan lp.CCMetric) {
|
|||||||
cclog.ComponentError(
|
cclog.ComponentError(
|
||||||
m.name,
|
m.name,
|
||||||
fmt.Sprintf("readIpmiTool(): Failed to wait for the end of command \"%s\": %v\n", command.String(), err),
|
fmt.Sprintf("readIpmiTool(): Failed to wait for the end of command \"%s\": %v\n", command.String(), err),
|
||||||
fmt.Sprintf("readIpmiTool(): command stderr: \"%s\"\n", string(errMsg)),
|
|
||||||
)
|
)
|
||||||
|
cclog.ComponentError(m.name, fmt.Sprintf("readIpmiTool(): command stderr: \"%s\"\n", strings.TrimSpace(string(errMsg))))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -12,8 +12,8 @@ The global file contains the paths to the other four files and some global optio
|
|||||||
"collectors" : "collectors.json",
|
"collectors" : "collectors.json",
|
||||||
"receivers" : "receivers.json",
|
"receivers" : "receivers.json",
|
||||||
"router" : "router.json",
|
"router" : "router.json",
|
||||||
"interval": 10,
|
"interval": "10s",
|
||||||
"duration": 1
|
"duration": "1s"
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@@ -4,6 +4,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||||
@@ -13,10 +14,13 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type NatsReceiverConfig struct {
|
type NatsReceiverConfig struct {
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
Addr string `json:"address"`
|
Addr string `json:"address"`
|
||||||
Port string `json:"port"`
|
Port string `json:"port"`
|
||||||
Subject string `json:"subject"`
|
Subject string `json:"subject"`
|
||||||
|
User string `json:"user,omitempty"`
|
||||||
|
Password string `json:"password,omitempty"`
|
||||||
|
NkeyFile string `json:"nkey_file,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type NatsReceiver struct {
|
type NatsReceiver struct {
|
||||||
@@ -109,6 +113,7 @@ func (r *NatsReceiver) Close() {
|
|||||||
|
|
||||||
// NewNatsReceiver creates a new Receiver which subscribes to messages from a NATS server
|
// NewNatsReceiver creates a new Receiver which subscribes to messages from a NATS server
|
||||||
func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) {
|
func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) {
|
||||||
|
var uinfo nats.Option = nil
|
||||||
r := new(NatsReceiver)
|
r := new(NatsReceiver)
|
||||||
r.name = fmt.Sprintf("NatsReceiver(%s)", name)
|
r.name = fmt.Sprintf("NatsReceiver(%s)", name)
|
||||||
|
|
||||||
@@ -133,10 +138,22 @@ func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) {
|
|||||||
"source": r.name,
|
"source": r.name,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(r.config.User) > 0 && len(r.config.Password) > 0 {
|
||||||
|
uinfo = nats.UserInfo(r.config.User, r.config.Password)
|
||||||
|
} else if len(r.config.NkeyFile) > 0 {
|
||||||
|
_, err := os.Stat(r.config.NkeyFile)
|
||||||
|
if err == nil {
|
||||||
|
uinfo = nats.UserCredentials(r.config.NkeyFile)
|
||||||
|
} else {
|
||||||
|
cclog.ComponentError(r.name, "NKEY file", r.config.NkeyFile, "does not exist: %v", err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Connect to NATS server
|
// Connect to NATS server
|
||||||
url := fmt.Sprintf("nats://%s:%s", r.config.Addr, r.config.Port)
|
url := fmt.Sprintf("nats://%s:%s", r.config.Addr, r.config.Port)
|
||||||
cclog.ComponentDebug(r.name, "NewNatsReceiver", url, "Subject", r.config.Subject)
|
cclog.ComponentDebug(r.name, "NewNatsReceiver", url, "Subject", r.config.Subject)
|
||||||
if nc, err := nats.Connect(url); err == nil {
|
if nc, err := nats.Connect(url, uinfo); err == nil {
|
||||||
r.nc = nc
|
r.nc = nc
|
||||||
} else {
|
} else {
|
||||||
r.nc = nil
|
r.nc = nil
|
||||||
|
@@ -10,7 +10,10 @@ The `nats` receiver can be used receive metrics from the NATS network. The `nats
|
|||||||
"type": "nats",
|
"type": "nats",
|
||||||
"address" : "nats-server.example.org",
|
"address" : "nats-server.example.org",
|
||||||
"port" : "4222",
|
"port" : "4222",
|
||||||
"subject" : "subject"
|
"subject" : "subject",
|
||||||
|
"user": "natsuser",
|
||||||
|
"password": "natssecret",
|
||||||
|
"nkey_file": "/path/to/nkey_file"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
@@ -19,6 +22,9 @@ The `nats` receiver can be used receive metrics from the NATS network. The `nats
|
|||||||
- `address`: Address of the NATS control server
|
- `address`: Address of the NATS control server
|
||||||
- `port`: Port of the NATS control server
|
- `port`: Port of the NATS control server
|
||||||
- `subject`: Subscribes to this subject and receive metrics
|
- `subject`: Subscribes to this subject and receive metrics
|
||||||
|
- `user`: Connect to nats using this user
|
||||||
|
- `password`: Connect to nats using this password
|
||||||
|
- `nkey_file`: Path to credentials file with NKEY
|
||||||
|
|
||||||
### Debugging
|
### Debugging
|
||||||
|
|
||||||
|
@@ -17,7 +17,7 @@ This folder contains the SinkManager and sink implementations for the cc-metric-
|
|||||||
The configuration file for the sinks is a list of configurations. The `type` field in each specifies which sink to initialize.
|
The configuration file for the sinks is a list of configurations. The `type` field in each specifies which sink to initialize.
|
||||||
|
|
||||||
```json
|
```json
|
||||||
[
|
{
|
||||||
"mystdout" : {
|
"mystdout" : {
|
||||||
"type" : "stdout",
|
"type" : "stdout",
|
||||||
"meta_as_tags" : [
|
"meta_as_tags" : [
|
||||||
@@ -31,7 +31,7 @@ The configuration file for the sinks is a list of configurations. The `type` fie
|
|||||||
"database" : "ccmetric",
|
"database" : "ccmetric",
|
||||||
"password" : "<jwt token>"
|
"password" : "<jwt token>"
|
||||||
}
|
}
|
||||||
]
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
|
@@ -1,352 +0,0 @@
|
|||||||
package sinks
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
|
||||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
|
||||||
influx "github.com/influxdata/line-protocol/v2/lineprotocol"
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
"golang.org/x/exp/slices"
|
|
||||||
)
|
|
||||||
|
|
||||||
type AmqpSinkConfig struct {
|
|
||||||
// defines JSON tags for 'type' and 'meta_as_tags' (string list)
|
|
||||||
// See: metricSink.go
|
|
||||||
defaultSinkConfig
|
|
||||||
// Additional config options, for AmqpSink
|
|
||||||
QueueName string `json:"queue_name"`
|
|
||||||
// Maximum number of points sent to server in single request.
|
|
||||||
// Default: 1000
|
|
||||||
BatchSize int `json:"batch_size,omitempty"`
|
|
||||||
|
|
||||||
// Time interval for delayed sending of metrics.
|
|
||||||
// If the buffers are already filled before the end of this interval,
|
|
||||||
// the metrics are sent without further delay.
|
|
||||||
// Default: 1s
|
|
||||||
FlushInterval string `json:"flush_delay,omitempty"`
|
|
||||||
flushDelay time.Duration
|
|
||||||
|
|
||||||
Hostname string `json:"hostname"`
|
|
||||||
Port int `json:"port"`
|
|
||||||
PublishTimeout string `json:"publish_timeout,omitempty"`
|
|
||||||
publishTimeout time.Duration
|
|
||||||
Username string `json:"username,omitempty"`
|
|
||||||
Password string `json:"password,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type AmqpSink struct {
|
|
||||||
// declares elements 'name' and 'meta_as_tags' (string to bool map!)
|
|
||||||
sink
|
|
||||||
config AmqpSinkConfig // entry point to the AmqpSinkConfig
|
|
||||||
// influx line protocol encoder
|
|
||||||
encoder influx.Encoder
|
|
||||||
// number of records stored in the encoder
|
|
||||||
numRecordsInEncoder int
|
|
||||||
// List of tags and meta data tags which should be used as tags
|
|
||||||
extended_tag_list []key_value_pair
|
|
||||||
// Flush() runs in another goroutine and accesses the influx line protocol encoder,
|
|
||||||
// so this encoderLock has to protect the encoder and numRecordsInEncoder
|
|
||||||
encoderLock sync.Mutex
|
|
||||||
|
|
||||||
// timer to run Flush()
|
|
||||||
flushTimer *time.Timer
|
|
||||||
// Lock to assure that only one timer is running at a time
|
|
||||||
timerLock sync.Mutex
|
|
||||||
|
|
||||||
// WaitGroup to ensure only one send operation is running at a time
|
|
||||||
sendWaitGroup sync.WaitGroup
|
|
||||||
|
|
||||||
client *amqp.Connection
|
|
||||||
channel *amqp.Channel
|
|
||||||
queue amqp.Queue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implement functions required for Sink interface
|
|
||||||
// Write(...), Flush(), Close()
|
|
||||||
// See: metricSink.go
|
|
||||||
|
|
||||||
// Code to submit a single CCMetric to the sink
|
|
||||||
func (s *AmqpSink) Write(m lp.CCMetric) error {
|
|
||||||
|
|
||||||
// Lock for encoder usage
|
|
||||||
s.encoderLock.Lock()
|
|
||||||
|
|
||||||
// Encode measurement name
|
|
||||||
s.encoder.StartLine(m.Name())
|
|
||||||
|
|
||||||
// copy tags and meta data which should be used as tags
|
|
||||||
s.extended_tag_list = s.extended_tag_list[:0]
|
|
||||||
for key, value := range m.Tags() {
|
|
||||||
s.extended_tag_list =
|
|
||||||
append(
|
|
||||||
s.extended_tag_list,
|
|
||||||
key_value_pair{
|
|
||||||
key: key,
|
|
||||||
value: value,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
for _, key := range s.config.MetaAsTags {
|
|
||||||
if value, ok := m.GetMeta(key); ok {
|
|
||||||
s.extended_tag_list =
|
|
||||||
append(
|
|
||||||
s.extended_tag_list,
|
|
||||||
key_value_pair{
|
|
||||||
key: key,
|
|
||||||
value: value,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Encode tags (they musts be in lexical order)
|
|
||||||
slices.SortFunc(
|
|
||||||
s.extended_tag_list,
|
|
||||||
func(a key_value_pair, b key_value_pair) int {
|
|
||||||
if a.key < b.key {
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
if a.key > b.key {
|
|
||||||
return +1
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
},
|
|
||||||
)
|
|
||||||
for i := range s.extended_tag_list {
|
|
||||||
s.encoder.AddTag(
|
|
||||||
s.extended_tag_list[i].key,
|
|
||||||
s.extended_tag_list[i].value,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Encode fields
|
|
||||||
for key, value := range m.Fields() {
|
|
||||||
s.encoder.AddField(key, influx.MustNewValue(value))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Encode time stamp
|
|
||||||
s.encoder.EndLine(m.Time())
|
|
||||||
|
|
||||||
// Check for encoder errors
|
|
||||||
if err := s.encoder.Err(); err != nil {
|
|
||||||
// Unlock encoder usage
|
|
||||||
s.encoderLock.Unlock()
|
|
||||||
|
|
||||||
return fmt.Errorf("encoding failed: %v", err)
|
|
||||||
}
|
|
||||||
s.numRecordsInEncoder++
|
|
||||||
|
|
||||||
if s.config.flushDelay == 0 {
|
|
||||||
// Unlock encoder usage
|
|
||||||
s.encoderLock.Unlock()
|
|
||||||
|
|
||||||
// Directly flush if no flush delay is configured
|
|
||||||
return s.Flush()
|
|
||||||
} else if s.numRecordsInEncoder == s.config.BatchSize {
|
|
||||||
// Unlock encoder usage
|
|
||||||
s.encoderLock.Unlock()
|
|
||||||
|
|
||||||
// Stop flush timer
|
|
||||||
if s.flushTimer != nil {
|
|
||||||
if ok := s.flushTimer.Stop(); ok {
|
|
||||||
cclog.ComponentDebug(s.name, "Write(): Stopped flush timer. Batch size limit reached before flush delay")
|
|
||||||
s.timerLock.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Flush if batch size is reached
|
|
||||||
return s.Flush()
|
|
||||||
} else if s.timerLock.TryLock() {
|
|
||||||
|
|
||||||
// Setup flush timer when flush delay is configured
|
|
||||||
// and no other timer is already running
|
|
||||||
if s.flushTimer != nil {
|
|
||||||
|
|
||||||
// Restarting existing flush timer
|
|
||||||
cclog.ComponentDebug(s.name, "Write(): Restarting flush timer")
|
|
||||||
s.flushTimer.Reset(s.config.flushDelay)
|
|
||||||
} else {
|
|
||||||
|
|
||||||
// Creating and starting flush timer
|
|
||||||
cclog.ComponentDebug(s.name, "Write(): Starting new flush timer")
|
|
||||||
s.flushTimer = time.AfterFunc(
|
|
||||||
s.config.flushDelay,
|
|
||||||
func() {
|
|
||||||
defer s.timerLock.Unlock()
|
|
||||||
cclog.ComponentDebug(s.name, "Starting flush triggered by flush timer")
|
|
||||||
if err := s.Flush(); err != nil {
|
|
||||||
cclog.ComponentError(s.name, "Flush triggered by flush timer: flush failed:", err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unlock encoder usage
|
|
||||||
s.encoderLock.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the sink uses batched sends internally, you can tell to flush its buffers
|
|
||||||
func (s *AmqpSink) Flush() error {
|
|
||||||
|
|
||||||
// Lock for encoder usage
|
|
||||||
// Own lock for as short as possible: the time it takes to clone the buffer.
|
|
||||||
s.encoderLock.Lock()
|
|
||||||
|
|
||||||
buf := slices.Clone(s.encoder.Bytes())
|
|
||||||
numRecordsInBuf := s.numRecordsInEncoder
|
|
||||||
s.encoder.Reset()
|
|
||||||
s.numRecordsInEncoder = 0
|
|
||||||
|
|
||||||
// Unlock encoder usage
|
|
||||||
s.encoderLock.Unlock()
|
|
||||||
|
|
||||||
if len(buf) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
cclog.ComponentDebug(s.name, "Flush(): Flushing", numRecordsInBuf, "metrics")
|
|
||||||
|
|
||||||
// Asynchron send of encoder metrics
|
|
||||||
s.sendWaitGroup.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer s.sendWaitGroup.Done()
|
|
||||||
//startTime := time.Now()
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), s.config.flushDelay)
|
|
||||||
defer cancel()
|
|
||||||
err := s.channel.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{
|
|
||||||
ContentType: "text/plain",
|
|
||||||
Body: buf,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
cclog.ComponentError(s.name, err.Error())
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close sink: close network connection, close files, close libraries, ...
|
|
||||||
func (s *AmqpSink) Close() {
|
|
||||||
|
|
||||||
cclog.ComponentDebug(s.name, "CLOSE")
|
|
||||||
|
|
||||||
// Stop existing timer and immediately flush
|
|
||||||
if s.flushTimer != nil {
|
|
||||||
if ok := s.flushTimer.Stop(); ok {
|
|
||||||
s.timerLock.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Flush
|
|
||||||
if err := s.Flush(); err != nil {
|
|
||||||
cclog.ComponentError(s.name, "Close():", "Flush failed:", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for send operations to finish
|
|
||||||
s.sendWaitGroup.Wait()
|
|
||||||
|
|
||||||
s.client.Close()
|
|
||||||
s.client = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// New function to create a new instance of the sink
|
|
||||||
// Initialize the sink by giving it a name and reading in the config JSON
|
|
||||||
func NewAmqpSink(name string, config json.RawMessage) (Sink, error) {
|
|
||||||
s := new(AmqpSink)
|
|
||||||
|
|
||||||
// Set name of sampleSink
|
|
||||||
// The name should be chosen in such a way that different instances of AmqpSink can be distinguished
|
|
||||||
s.name = fmt.Sprintf("AmqpSink(%s)", name) // Always specify a name here
|
|
||||||
|
|
||||||
// Set defaults in s.config
|
|
||||||
// Allow overwriting these defaults by reading config JSON
|
|
||||||
|
|
||||||
s.config.PublishTimeout = "4s"
|
|
||||||
s.config.publishTimeout = time.Duration(4) * time.Second
|
|
||||||
s.config.Hostname = "localhost"
|
|
||||||
s.config.Port = 1883
|
|
||||||
|
|
||||||
// Read in the config JSON
|
|
||||||
if len(config) > 0 {
|
|
||||||
d := json.NewDecoder(bytes.NewReader(config))
|
|
||||||
d.DisallowUnknownFields()
|
|
||||||
if err := d.Decode(&s.config); err != nil {
|
|
||||||
cclog.ComponentError(s.name, "Error reading config:", err.Error())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create lookup map to use meta infos as tags in the output metric
|
|
||||||
s.meta_as_tags = make(map[string]bool)
|
|
||||||
for _, k := range s.config.MetaAsTags {
|
|
||||||
s.meta_as_tags[k] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if all required fields in the config are set
|
|
||||||
// E.g. use 'len(s.config.Option) > 0' for string settings
|
|
||||||
if t, err := time.ParseDuration(s.config.PublishTimeout); err == nil {
|
|
||||||
s.config.publishTimeout = t
|
|
||||||
} else {
|
|
||||||
err := fmt.Errorf("to parse duration for PublishTimeout: %s", s.config.PublishTimeout)
|
|
||||||
cclog.ComponentError(s.name, err.Error())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if t, err := time.ParseDuration(s.config.FlushInterval); err == nil {
|
|
||||||
s.config.flushDelay = t
|
|
||||||
} else {
|
|
||||||
err := fmt.Errorf("to parse duration for FlushInterval: %s", s.config.FlushInterval)
|
|
||||||
cclog.ComponentError(s.name, err.Error())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
url := net.JoinHostPort(s.config.Hostname, fmt.Sprintf("%d", s.config.Port))
|
|
||||||
userpart := ""
|
|
||||||
if len(s.config.Username) > 0 {
|
|
||||||
userpart = s.config.Username
|
|
||||||
if len(s.config.Password) > 0 {
|
|
||||||
userpart += ":" + s.config.Password
|
|
||||||
}
|
|
||||||
userpart += "@"
|
|
||||||
}
|
|
||||||
url = fmt.Sprintf("amqp://%s%s", userpart, url)
|
|
||||||
|
|
||||||
// Establish connection to the server, library, ...
|
|
||||||
// Check required files exist and lookup path(s) of executable(s)
|
|
||||||
c, err := amqp.Dial(url)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
s.client = c
|
|
||||||
|
|
||||||
ch, err := c.Channel()
|
|
||||||
if err != nil {
|
|
||||||
s.client.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
s.channel = ch
|
|
||||||
|
|
||||||
q, err := ch.QueueDeclare(
|
|
||||||
s.config.QueueName, // name
|
|
||||||
false, // durable
|
|
||||||
false, // delete when unused
|
|
||||||
false, // exclusive
|
|
||||||
false, // no-wait
|
|
||||||
nil, // arguments
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
s.channel.Close()
|
|
||||||
s.client.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
s.queue = q
|
|
||||||
|
|
||||||
// Return (nil, meaningful error message) in case of errors
|
|
||||||
return s, nil
|
|
||||||
}
|
|
@@ -1,33 +0,0 @@
|
|||||||
## `amqp` sink
|
|
||||||
|
|
||||||
The `amqp` sink publishes all metrics into a RabbitMQ network. The publishing key is the queue name in the configuration file
|
|
||||||
|
|
||||||
### Configuration structure
|
|
||||||
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"<name>": {
|
|
||||||
"type": "amqp",
|
|
||||||
"queue_name" : "myqueue",
|
|
||||||
"batch_size" : 1000,
|
|
||||||
"flush_delay": "4s",
|
|
||||||
"publish_timeout": "1s",
|
|
||||||
"host": "dbhost.example.com",
|
|
||||||
"port": 5672,
|
|
||||||
"username": "exampleuser",
|
|
||||||
"password" : "examplepw",
|
|
||||||
"meta_as_tags" : [],
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
- `type`: makes the sink an `amqp` sink, also `rabbitmq` is allowed as alias
|
|
||||||
- `queue_name`: All metrics are published to this queue
|
|
||||||
- `host`: Hostname of the RabbitMQ server
|
|
||||||
- `port`: Port number of the RabbitMQ server
|
|
||||||
- `username`: Username for basic authentication
|
|
||||||
- `password`: Password for basic authentication
|
|
||||||
- `meta_as_tags`: print all meta information as tags in the output (optional)
|
|
||||||
- `publish_timeout`: Timeout for each publication operation (default `1s`)
|
|
||||||
- `flush_delay`: Group metrics coming in to a single batch (default `4s`)
|
|
||||||
- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay (default: `1000`)
|
|
@@ -5,6 +5,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -22,6 +23,7 @@ type NatsSinkConfig struct {
|
|||||||
User string `json:"user,omitempty"`
|
User string `json:"user,omitempty"`
|
||||||
Password string `json:"password,omitempty"`
|
Password string `json:"password,omitempty"`
|
||||||
FlushDelay string `json:"flush_delay,omitempty"`
|
FlushDelay string `json:"flush_delay,omitempty"`
|
||||||
|
NkeyFile string `json:"nkey_file,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type NatsSink struct {
|
type NatsSink struct {
|
||||||
@@ -42,6 +44,13 @@ func (s *NatsSink) connect() error {
|
|||||||
var nc *nats.Conn
|
var nc *nats.Conn
|
||||||
if len(s.config.User) > 0 && len(s.config.Password) > 0 {
|
if len(s.config.User) > 0 && len(s.config.Password) > 0 {
|
||||||
uinfo = nats.UserInfo(s.config.User, s.config.Password)
|
uinfo = nats.UserInfo(s.config.User, s.config.Password)
|
||||||
|
} else if len(s.config.NkeyFile) > 0 {
|
||||||
|
if _, err := os.Stat(s.config.NkeyFile); err == nil {
|
||||||
|
uinfo = nats.UserCredentials(s.config.NkeyFile)
|
||||||
|
} else {
|
||||||
|
cclog.ComponentError(s.name, "NKEY file", s.config.NkeyFile, "does not exist: %v", err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
uri := fmt.Sprintf("nats://%s:%s", s.config.Host, s.config.Port)
|
uri := fmt.Sprintf("nats://%s:%s", s.config.Host, s.config.Port)
|
||||||
cclog.ComponentDebug(s.name, "Connect to", uri)
|
cclog.ComponentDebug(s.name, "Connect to", uri)
|
||||||
|
@@ -13,6 +13,7 @@ The `nats` sink publishes all metrics into a NATS network. The publishing key is
|
|||||||
"port": "4222",
|
"port": "4222",
|
||||||
"user": "exampleuser",
|
"user": "exampleuser",
|
||||||
"password" : "examplepw",
|
"password" : "examplepw",
|
||||||
|
"nkey_file": "/path/to/nkey_file",
|
||||||
"meta_as_tags" : [],
|
"meta_as_tags" : [],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -25,3 +26,4 @@ The `nats` sink publishes all metrics into a NATS network. The publishing key is
|
|||||||
- `user`: Username for basic authentication
|
- `user`: Username for basic authentication
|
||||||
- `password`: Password for basic authentication
|
- `password`: Password for basic authentication
|
||||||
- `meta_as_tags`: print all meta information as tags in the output (optional)
|
- `meta_as_tags`: print all meta information as tags in the output (optional)
|
||||||
|
- `nkey_file`: Path to credentials file with NKEY
|
||||||
|
@@ -21,8 +21,6 @@ var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink,
|
|||||||
"influxdb": NewInfluxSink,
|
"influxdb": NewInfluxSink,
|
||||||
"influxasync": NewInfluxAsyncSink,
|
"influxasync": NewInfluxAsyncSink,
|
||||||
"http": NewHttpSink,
|
"http": NewHttpSink,
|
||||||
"amqp": NewAmqpSink,
|
|
||||||
"rabbitmq": NewAmqpSink,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Metric collector manager data structure
|
// Metric collector manager data structure
|
||||||
|
Reference in New Issue
Block a user