mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-10-21 13:25:07 +02:00
Compare commits
1 Commits
mqtt_sink
...
syslog_sin
Author | SHA1 | Date | |
---|---|---|---|
|
f36a0c13b0 |
@@ -99,7 +99,10 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
output <- lp.FromInfluxMetric(c)
|
y := lp.FromInfluxMetric(c)
|
||||||
|
if err == nil {
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, file := range m.files {
|
for _, file := range m.files {
|
||||||
@@ -118,7 +121,10 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri
|
|||||||
if skip {
|
if skip {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
output <- lp.FromInfluxMetric(f)
|
y := lp.FromInfluxMetric(f)
|
||||||
|
if err == nil {
|
||||||
|
output <- y
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,372 +0,0 @@
|
|||||||
package sinks
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"os"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
|
||||||
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
|
||||||
"github.com/go-mqtt/mqtt"
|
|
||||||
influx "github.com/influxdata/line-protocol/v2/lineprotocol"
|
|
||||||
"golang.org/x/exp/slices"
|
|
||||||
)
|
|
||||||
|
|
||||||
type MqttSinkConfig struct {
|
|
||||||
// defines JSON tags for 'type' and 'meta_as_tags' (string list)
|
|
||||||
// See: metricSink.go
|
|
||||||
defaultSinkConfig
|
|
||||||
// Additional config options, for MqttSink
|
|
||||||
ClientID string `json:"client_id"`
|
|
||||||
PersistenceDirectory string `json:"persistence_directory,omitempty"`
|
|
||||||
// Maximum number of points sent to server in single request.
|
|
||||||
// Default: 1000
|
|
||||||
BatchSize int `json:"batch_size,omitempty"`
|
|
||||||
|
|
||||||
// Time interval for delayed sending of metrics.
|
|
||||||
// If the buffers are already filled before the end of this interval,
|
|
||||||
// the metrics are sent without further delay.
|
|
||||||
// Default: 1s
|
|
||||||
FlushInterval string `json:"flush_delay,omitempty"`
|
|
||||||
flushDelay time.Duration
|
|
||||||
|
|
||||||
DialProtocol string `json:"dial_protocol"`
|
|
||||||
Hostname string `json:"hostname"`
|
|
||||||
Port int `json:"port"`
|
|
||||||
PauseTimeout string `json:"pause_timeout"`
|
|
||||||
pauseTimeout time.Duration
|
|
||||||
KeepAlive uint16 `json:"keep_alive_seconds"`
|
|
||||||
Username string `json:"username,omitempty"`
|
|
||||||
Password string `json:"password,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type MqttSink struct {
|
|
||||||
// declares elements 'name' and 'meta_as_tags' (string to bool map!)
|
|
||||||
sink
|
|
||||||
config MqttSinkConfig // entry point to the MqttSinkConfig
|
|
||||||
// influx line protocol encoder
|
|
||||||
encoder influx.Encoder
|
|
||||||
// number of records stored in the encoder
|
|
||||||
numRecordsInEncoder int
|
|
||||||
// List of tags and meta data tags which should be used as tags
|
|
||||||
extended_tag_list []key_value_pair
|
|
||||||
// Flush() runs in another goroutine and accesses the influx line protocol encoder,
|
|
||||||
// so this encoderLock has to protect the encoder and numRecordsInEncoder
|
|
||||||
encoderLock sync.Mutex
|
|
||||||
|
|
||||||
// timer to run Flush()
|
|
||||||
flushTimer *time.Timer
|
|
||||||
// Lock to assure that only one timer is running at a time
|
|
||||||
timerLock sync.Mutex
|
|
||||||
|
|
||||||
// WaitGroup to ensure only one send operation is running at a time
|
|
||||||
sendWaitGroup sync.WaitGroup
|
|
||||||
|
|
||||||
client *mqtt.Client
|
|
||||||
mqttconfig mqtt.Config
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implement functions required for Sink interface
|
|
||||||
// Write(...), Flush(), Close()
|
|
||||||
// See: metricSink.go
|
|
||||||
|
|
||||||
// Code to submit a single CCMetric to the sink
|
|
||||||
func (s *MqttSink) Write(m lp.CCMetric) error {
|
|
||||||
|
|
||||||
// Lock for encoder usage
|
|
||||||
s.encoderLock.Lock()
|
|
||||||
|
|
||||||
// Encode measurement name
|
|
||||||
s.encoder.StartLine(m.Name())
|
|
||||||
|
|
||||||
// copy tags and meta data which should be used as tags
|
|
||||||
s.extended_tag_list = s.extended_tag_list[:0]
|
|
||||||
for key, value := range m.Tags() {
|
|
||||||
s.extended_tag_list =
|
|
||||||
append(
|
|
||||||
s.extended_tag_list,
|
|
||||||
key_value_pair{
|
|
||||||
key: key,
|
|
||||||
value: value,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
for _, key := range s.config.MetaAsTags {
|
|
||||||
if value, ok := m.GetMeta(key); ok {
|
|
||||||
s.extended_tag_list =
|
|
||||||
append(
|
|
||||||
s.extended_tag_list,
|
|
||||||
key_value_pair{
|
|
||||||
key: key,
|
|
||||||
value: value,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Encode tags (they musts be in lexical order)
|
|
||||||
slices.SortFunc(
|
|
||||||
s.extended_tag_list,
|
|
||||||
func(a key_value_pair, b key_value_pair) int {
|
|
||||||
if a.key < b.key {
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
if a.key > b.key {
|
|
||||||
return +1
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
},
|
|
||||||
)
|
|
||||||
for i := range s.extended_tag_list {
|
|
||||||
s.encoder.AddTag(
|
|
||||||
s.extended_tag_list[i].key,
|
|
||||||
s.extended_tag_list[i].value,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Encode fields
|
|
||||||
for key, value := range m.Fields() {
|
|
||||||
s.encoder.AddField(key, influx.MustNewValue(value))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Encode time stamp
|
|
||||||
s.encoder.EndLine(m.Time())
|
|
||||||
|
|
||||||
// Check for encoder errors
|
|
||||||
if err := s.encoder.Err(); err != nil {
|
|
||||||
// Unlock encoder usage
|
|
||||||
s.encoderLock.Unlock()
|
|
||||||
|
|
||||||
return fmt.Errorf("encoding failed: %v", err)
|
|
||||||
}
|
|
||||||
s.numRecordsInEncoder++
|
|
||||||
|
|
||||||
if s.config.flushDelay == 0 {
|
|
||||||
// Unlock encoder usage
|
|
||||||
s.encoderLock.Unlock()
|
|
||||||
|
|
||||||
// Directly flush if no flush delay is configured
|
|
||||||
return s.Flush()
|
|
||||||
} else if s.numRecordsInEncoder == s.config.BatchSize {
|
|
||||||
// Unlock encoder usage
|
|
||||||
s.encoderLock.Unlock()
|
|
||||||
|
|
||||||
// Stop flush timer
|
|
||||||
if s.flushTimer != nil {
|
|
||||||
if ok := s.flushTimer.Stop(); ok {
|
|
||||||
cclog.ComponentDebug(s.name, "Write(): Stopped flush timer. Batch size limit reached before flush delay")
|
|
||||||
s.timerLock.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Flush if batch size is reached
|
|
||||||
return s.Flush()
|
|
||||||
} else if s.timerLock.TryLock() {
|
|
||||||
|
|
||||||
// Setup flush timer when flush delay is configured
|
|
||||||
// and no other timer is already running
|
|
||||||
if s.flushTimer != nil {
|
|
||||||
|
|
||||||
// Restarting existing flush timer
|
|
||||||
cclog.ComponentDebug(s.name, "Write(): Restarting flush timer")
|
|
||||||
s.flushTimer.Reset(s.config.flushDelay)
|
|
||||||
} else {
|
|
||||||
|
|
||||||
// Creating and starting flush timer
|
|
||||||
cclog.ComponentDebug(s.name, "Write(): Starting new flush timer")
|
|
||||||
s.flushTimer = time.AfterFunc(
|
|
||||||
s.config.flushDelay,
|
|
||||||
func() {
|
|
||||||
defer s.timerLock.Unlock()
|
|
||||||
cclog.ComponentDebug(s.name, "Starting flush triggered by flush timer")
|
|
||||||
if err := s.Flush(); err != nil {
|
|
||||||
cclog.ComponentError(s.name, "Flush triggered by flush timer: flush failed:", err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unlock encoder usage
|
|
||||||
s.encoderLock.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the sink uses batched sends internally, you can tell to flush its buffers
|
|
||||||
func (s *MqttSink) Flush() error {
|
|
||||||
|
|
||||||
// Lock for encoder usage
|
|
||||||
// Own lock for as short as possible: the time it takes to clone the buffer.
|
|
||||||
s.encoderLock.Lock()
|
|
||||||
|
|
||||||
buf := slices.Clone(s.encoder.Bytes())
|
|
||||||
numRecordsInBuf := s.numRecordsInEncoder
|
|
||||||
s.encoder.Reset()
|
|
||||||
s.numRecordsInEncoder = 0
|
|
||||||
|
|
||||||
// Unlock encoder usage
|
|
||||||
s.encoderLock.Unlock()
|
|
||||||
|
|
||||||
if len(buf) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
cclog.ComponentDebug(s.name, "Flush(): Flushing", numRecordsInBuf, "metrics")
|
|
||||||
|
|
||||||
// Asynchron send of encoder metrics
|
|
||||||
s.sendWaitGroup.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer s.sendWaitGroup.Done()
|
|
||||||
//startTime := time.Now()
|
|
||||||
for {
|
|
||||||
exchange, err := s.client.PublishAtLeastOnce(buf, s.config.ClientID)
|
|
||||||
switch {
|
|
||||||
case err == nil:
|
|
||||||
return
|
|
||||||
|
|
||||||
case mqtt.IsDeny(err), errors.Is(err, mqtt.ErrClosed):
|
|
||||||
return
|
|
||||||
|
|
||||||
case errors.Is(err, mqtt.ErrMax):
|
|
||||||
time.Sleep(s.config.pauseTimeout)
|
|
||||||
|
|
||||||
default:
|
|
||||||
time.Sleep(s.config.pauseTimeout)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for err := range exchange {
|
|
||||||
if errors.Is(err, mqtt.ErrClosed) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close sink: close network connection, close files, close libraries, ...
|
|
||||||
func (s *MqttSink) Close() {
|
|
||||||
|
|
||||||
cclog.ComponentDebug(s.name, "CLOSE")
|
|
||||||
|
|
||||||
// Stop existing timer and immediately flush
|
|
||||||
if s.flushTimer != nil {
|
|
||||||
if ok := s.flushTimer.Stop(); ok {
|
|
||||||
s.timerLock.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Flush
|
|
||||||
if err := s.Flush(); err != nil {
|
|
||||||
cclog.ComponentError(s.name, "Close():", "Flush failed:", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for send operations to finish
|
|
||||||
s.sendWaitGroup.Wait()
|
|
||||||
|
|
||||||
s.client.Close()
|
|
||||||
s.client = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// New function to create a new instance of the sink
|
|
||||||
// Initialize the sink by giving it a name and reading in the config JSON
|
|
||||||
func NewMqttSink(name string, config json.RawMessage) (Sink, error) {
|
|
||||||
s := new(MqttSink)
|
|
||||||
|
|
||||||
// Set name of sampleSink
|
|
||||||
// The name should be chosen in such a way that different instances of MqttSink can be distinguished
|
|
||||||
s.name = fmt.Sprintf("MqttSink(%s)", name) // Always specify a name here
|
|
||||||
|
|
||||||
// Set defaults in s.config
|
|
||||||
// Allow overwriting these defaults by reading config JSON
|
|
||||||
|
|
||||||
s.config.PauseTimeout = "4s"
|
|
||||||
s.config.pauseTimeout = time.Duration(4) * time.Second
|
|
||||||
s.config.DialProtocol = "tcp"
|
|
||||||
s.config.Hostname = "localhost"
|
|
||||||
s.config.Port = 1883
|
|
||||||
|
|
||||||
// Read in the config JSON
|
|
||||||
if len(config) > 0 {
|
|
||||||
d := json.NewDecoder(bytes.NewReader(config))
|
|
||||||
d.DisallowUnknownFields()
|
|
||||||
if err := d.Decode(&s.config); err != nil {
|
|
||||||
cclog.ComponentError(s.name, "Error reading config:", err.Error())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create lookup map to use meta infos as tags in the output metric
|
|
||||||
s.meta_as_tags = make(map[string]bool)
|
|
||||||
for _, k := range s.config.MetaAsTags {
|
|
||||||
s.meta_as_tags[k] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if all required fields in the config are set
|
|
||||||
// E.g. use 'len(s.config.Option) > 0' for string settings
|
|
||||||
if t, err := time.ParseDuration(s.config.PauseTimeout); err == nil {
|
|
||||||
s.config.pauseTimeout = t
|
|
||||||
} else {
|
|
||||||
err := fmt.Errorf("to parse duration for PauseTimeout: %s", s.config.PauseTimeout)
|
|
||||||
cclog.ComponentError(s.name, err.Error())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if t, err := time.ParseDuration(s.config.FlushInterval); err == nil {
|
|
||||||
s.config.flushDelay = t
|
|
||||||
} else {
|
|
||||||
err := fmt.Errorf("to parse duration for FlushInterval: %s", s.config.FlushInterval)
|
|
||||||
cclog.ComponentError(s.name, err.Error())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
switch s.config.DialProtocol {
|
|
||||||
case "tcp", "udp":
|
|
||||||
default:
|
|
||||||
err := errors.New("failed to parse dial protocol, allowed: tcp, udp")
|
|
||||||
cclog.ComponentError(s.name, err.Error())
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var persistence mqtt.Persistence
|
|
||||||
if len(s.config.PersistenceDirectory) > 0 {
|
|
||||||
persistence = mqtt.FileSystem(s.config.PersistenceDirectory)
|
|
||||||
} else {
|
|
||||||
tmpdir, err := os.MkdirTemp("", "mqtt")
|
|
||||||
if err == nil {
|
|
||||||
persistence = mqtt.FileSystem(tmpdir)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Establish connection to the server, library, ...
|
|
||||||
// Check required files exist and lookup path(s) of executable(s)
|
|
||||||
|
|
||||||
dialer := mqtt.NewDialer(s.config.DialProtocol, net.JoinHostPort(s.config.Hostname, fmt.Sprintf("%d", s.config.Port)))
|
|
||||||
|
|
||||||
s.mqttconfig = mqtt.Config{
|
|
||||||
Dialer: dialer,
|
|
||||||
PauseTimeout: s.config.pauseTimeout,
|
|
||||||
KeepAlive: uint16(s.config.KeepAlive),
|
|
||||||
}
|
|
||||||
if len(s.config.Username) > 0 {
|
|
||||||
s.mqttconfig.UserName = s.config.Username
|
|
||||||
}
|
|
||||||
if len(s.config.Password) > 0 {
|
|
||||||
s.mqttconfig.Password = []byte(s.config.Password)
|
|
||||||
}
|
|
||||||
|
|
||||||
client, err := mqtt.InitSession(s.config.ClientID, persistence, &s.mqttconfig)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
s.client = client
|
|
||||||
|
|
||||||
// Return (nil, meaningful error message) in case of errors
|
|
||||||
return s, nil
|
|
||||||
}
|
|
@@ -1,39 +0,0 @@
|
|||||||
## `mqtt` sink
|
|
||||||
|
|
||||||
The `mqtt` sink publishes all metrics into a MQTT network.
|
|
||||||
|
|
||||||
### Configuration structure
|
|
||||||
|
|
||||||
```json
|
|
||||||
{
|
|
||||||
"<name>": {
|
|
||||||
"type": "mqtt",
|
|
||||||
"client_id" : "myid",
|
|
||||||
"persistence_directory": "/tmp",
|
|
||||||
"batch_size": 1000,
|
|
||||||
"flush_delay": "1s",
|
|
||||||
"dial_protocol": "tcp",
|
|
||||||
"host": "dbhost.example.com",
|
|
||||||
"port": 1883,
|
|
||||||
"user": "exampleuser",
|
|
||||||
"password" : "examplepw",
|
|
||||||
"pause_timeout": "1s",
|
|
||||||
"keep_alive_seconds": 10,
|
|
||||||
"meta_as_tags" : [],
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
- `type`: makes the sink an `mqtt` sink
|
|
||||||
- `client_id`: MQTT clients use a client_id to talk to the broker
|
|
||||||
- `persistence_directory`: MQTT stores messages temporarly on disc if the broker is not available. Folder needs to be writable (default: `/tmp`)
|
|
||||||
- `pause_timeout`: Waittime when published failed
|
|
||||||
- `keep_alive_seconds`: Keep the connection alive for some time. Recommended to be longer than global `interval`.
|
|
||||||
- `flush_delay`: Group metrics coming in to a single batch
|
|
||||||
- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay
|
|
||||||
- `dial_protocol`: Use `tcp` or `udp` for the MQTT communication
|
|
||||||
- `host`: Hostname of the MQTT broker
|
|
||||||
- `port`: Port number of the MQTT broker
|
|
||||||
- `user`: Username for authentication
|
|
||||||
- `password`: Password for authentication
|
|
||||||
- `meta_as_tags`: print all meta information as tags in the output (optional)
|
|
@@ -21,7 +21,6 @@ var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink,
|
|||||||
"influxdb": NewInfluxSink,
|
"influxdb": NewInfluxSink,
|
||||||
"influxasync": NewInfluxAsyncSink,
|
"influxasync": NewInfluxAsyncSink,
|
||||||
"http": NewHttpSink,
|
"http": NewHttpSink,
|
||||||
"mqtt": NewMqttSink,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Metric collector manager data structure
|
// Metric collector manager data structure
|
||||||
|
159
sinks/syslogSink.go
Normal file
159
sinks/syslogSink.go
Normal file
@@ -0,0 +1,159 @@
|
|||||||
|
package sinks
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"log/syslog"
|
||||||
|
|
||||||
|
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||||
|
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
|
||||||
|
)
|
||||||
|
|
||||||
|
var AvailableSyslogPriorities map[string]syslog.Priority = map[string]syslog.Priority{
|
||||||
|
"LOG_EMERG": syslog.LOG_EMERG,
|
||||||
|
"LOG_ALERT": syslog.LOG_ALERT,
|
||||||
|
"LOG_CRIT": syslog.LOG_CRIT,
|
||||||
|
"LOG_ERR": syslog.LOG_ERR,
|
||||||
|
"LOG_NOTICE": syslog.LOG_NOTICE,
|
||||||
|
"LOG_INFO": syslog.LOG_INFO,
|
||||||
|
"LOG_DEBUG": syslog.LOG_DEBUG,
|
||||||
|
"LOG_USER": syslog.LOG_USER,
|
||||||
|
"LOG_MAIL": syslog.LOG_MAIL,
|
||||||
|
"LOG_DAEMON": syslog.LOG_DAEMON,
|
||||||
|
"LOG_AUTH": syslog.LOG_AUTH,
|
||||||
|
"LOG_SYSLOG": syslog.LOG_SYSLOG,
|
||||||
|
"LOG_LPR": syslog.LOG_LPR,
|
||||||
|
"LOG_NEWS": syslog.LOG_NEWS,
|
||||||
|
"LOG_UUCP": syslog.LOG_UUCP,
|
||||||
|
"LOG_CRON": syslog.LOG_CRON,
|
||||||
|
"LOG_AUTHPRIV": syslog.LOG_AUTHPRIV,
|
||||||
|
"LOG_FTP": syslog.LOG_FTP,
|
||||||
|
"LOG_LOCAL0": syslog.LOG_LOCAL0,
|
||||||
|
"LOG_LOCAL1": syslog.LOG_LOCAL1,
|
||||||
|
"LOG_LOCAL2": syslog.LOG_LOCAL2,
|
||||||
|
"LOG_LOCAL3": syslog.LOG_LOCAL3,
|
||||||
|
"LOG_LOCAL4": syslog.LOG_LOCAL4,
|
||||||
|
"LOG_LOCAL5": syslog.LOG_LOCAL5,
|
||||||
|
"LOG_LOCAL6": syslog.LOG_LOCAL6,
|
||||||
|
"LOG_LOCAL7": syslog.LOG_LOCAL7,
|
||||||
|
}
|
||||||
|
|
||||||
|
type SyslogSinkConfig struct {
|
||||||
|
defaultSinkConfig
|
||||||
|
SyslogTag string `json:"syslog_tag"`
|
||||||
|
SyslogPriories []string `json:"syslog_priorities"`
|
||||||
|
SyslogWriter string `json:"syslog_writer"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type SyslogSink struct {
|
||||||
|
sink
|
||||||
|
config SyslogSinkConfig // entry point to the SyslogSinkConfig
|
||||||
|
syslogPrios syslog.Priority
|
||||||
|
out *syslog.Writer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SyslogSink) Write(point lp.CCMetric) error {
|
||||||
|
var err error = nil
|
||||||
|
p := point.ToLineProtocol(s.meta_as_tags)
|
||||||
|
switch s.config.SyslogWriter {
|
||||||
|
case "info":
|
||||||
|
err = s.out.Info(p)
|
||||||
|
case "debug":
|
||||||
|
err = s.out.Debug(p)
|
||||||
|
case "alert":
|
||||||
|
err = s.out.Alert(p)
|
||||||
|
case "emerg":
|
||||||
|
err = s.out.Emerg(p)
|
||||||
|
case "err":
|
||||||
|
err = s.out.Err(p)
|
||||||
|
case "notice":
|
||||||
|
err = s.out.Notice(p)
|
||||||
|
case "warning":
|
||||||
|
err = s.out.Warning(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SyslogSink) Flush() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SyslogSink) Close() {
|
||||||
|
s.out.Close()
|
||||||
|
cclog.ComponentDebug(s.name, "CLOSE")
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSyslogSink(name string, config json.RawMessage) (Sink, error) {
|
||||||
|
var err error = nil
|
||||||
|
s := new(SyslogSink)
|
||||||
|
|
||||||
|
s.name = fmt.Sprintf("SyslogSink(%s)", name)
|
||||||
|
|
||||||
|
// Read in the config JSON
|
||||||
|
if len(config) > 0 {
|
||||||
|
d := json.NewDecoder(bytes.NewReader(config))
|
||||||
|
d.DisallowUnknownFields()
|
||||||
|
if err := d.Decode(&s.config); err != nil {
|
||||||
|
cclog.ComponentError(s.name, "Error reading config:", err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create lookup map to use meta infos as tags in the output metric
|
||||||
|
s.meta_as_tags = make(map[string]bool)
|
||||||
|
for _, k := range s.config.MetaAsTags {
|
||||||
|
s.meta_as_tags[k] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(s.config.SyslogTag) == 0 {
|
||||||
|
err := errors.New("syslog tag must be non-empty")
|
||||||
|
cclog.ComponentError(s.name, err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(s.config.SyslogPriories) == 0 {
|
||||||
|
err := errors.New("syslog prioritiey must be non-empty")
|
||||||
|
cclog.ComponentError(s.name, err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, p := range s.config.SyslogPriories {
|
||||||
|
if prio, ok := AvailableSyslogPriorities[p]; !ok {
|
||||||
|
err := fmt.Errorf("invalid syslog priority '%s'", p)
|
||||||
|
cclog.ComponentError(s.name, err.Error())
|
||||||
|
return nil, err
|
||||||
|
} else {
|
||||||
|
s.syslogPrios |= prio
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mywriter := ""
|
||||||
|
for _, w := range []string{
|
||||||
|
"info",
|
||||||
|
"debug",
|
||||||
|
"alert",
|
||||||
|
"emerg",
|
||||||
|
"err",
|
||||||
|
"notice",
|
||||||
|
"warning",
|
||||||
|
} {
|
||||||
|
if s.config.SyslogWriter == w {
|
||||||
|
mywriter = w
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(mywriter) == 0 {
|
||||||
|
err := fmt.Errorf("invalid syslog writer '%s'", s.config.SyslogWriter)
|
||||||
|
cclog.ComponentError(s.name, err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.out, err = syslog.New(s.syslogPrios, s.config.SyslogTag)
|
||||||
|
if err != nil {
|
||||||
|
cclog.ComponentError(s.name, err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return s, nil
|
||||||
|
}
|
64
sinks/syslogSink.md
Normal file
64
sinks/syslogSink.md
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
## `syslog` sink
|
||||||
|
|
||||||
|
The `syslog` sink provides an easy way to submit metrics and events to the Syslog logging system.
|
||||||
|
|
||||||
|
### Configuration structure
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"<name>": {
|
||||||
|
"type": "syslog",
|
||||||
|
"meta_as_tags" : [],
|
||||||
|
"syslog_tag" : "mytag",
|
||||||
|
"syslog_priorities" : [
|
||||||
|
"LOG_INFO",
|
||||||
|
"LOG_LOCAL7",
|
||||||
|
],
|
||||||
|
"syslog_writer" : "info"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- `type`: makes the sink an `syslog` sink
|
||||||
|
- `meta_as_tags`: print meta information as tags in the output (optional)
|
||||||
|
- `syslog_tag`: The tag submitted with each message
|
||||||
|
- `syslog_priorities`: List of syslog priorities. Will be OR'd together
|
||||||
|
- `syslog_writer`: Submit metrics with this write level
|
||||||
|
|
||||||
|
|
||||||
|
#### Possible priorities for `syslog_priorities`
|
||||||
|
- `LOG_EMERG`
|
||||||
|
- `LOG_ALERT`
|
||||||
|
- `LOG_CRIT`
|
||||||
|
- `LOG_ERR`
|
||||||
|
- `LOG_NOTICE`
|
||||||
|
- `LOG_INFO`
|
||||||
|
- `LOG_DEBUG`
|
||||||
|
- `LOG_USER`
|
||||||
|
- `LOG_MAIL`
|
||||||
|
- `LOG_DAEMON`
|
||||||
|
- `LOG_AUTH`
|
||||||
|
- `LOG_SYSLOG`
|
||||||
|
- `LOG_LPR`
|
||||||
|
- `LOG_NEWS`
|
||||||
|
- `LOG_UUCP`
|
||||||
|
- `LOG_CRON`
|
||||||
|
- `LOG_AUTHPRIV`
|
||||||
|
- `LOG_FTP`
|
||||||
|
- `LOG_LOCAL0`
|
||||||
|
- `LOG_LOCAL1`
|
||||||
|
- `LOG_LOCAL2`
|
||||||
|
- `LOG_LOCAL3`
|
||||||
|
- `LOG_LOCAL4`
|
||||||
|
- `LOG_LOCAL5`
|
||||||
|
- `LOG_LOCAL6`
|
||||||
|
- `LOG_LOCAL7`
|
||||||
|
|
||||||
|
#### Possible writers for `syslog_writer`
|
||||||
|
- `info`
|
||||||
|
- `debug`
|
||||||
|
- `alert`
|
||||||
|
- `emerg`
|
||||||
|
- `err`
|
||||||
|
- `notice`
|
||||||
|
- `warning`
|
Reference in New Issue
Block a user