mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-10-20 21:05:06 +02:00 
			
		
		
		
	Compare commits
	
		
			1 Commits
		
	
	
		
			amqp_sink
			...
			syslog_sin
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | f36a0c13b0 | 
| @@ -99,7 +99,10 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			output <- lp.FromInfluxMetric(c) | ||||
| 			y := lp.FromInfluxMetric(c) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	for _, file := range m.files { | ||||
| @@ -118,7 +121,10 @@ func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetri | ||||
| 			if skip { | ||||
| 				continue | ||||
| 			} | ||||
| 			output <- lp.FromInfluxMetric(f) | ||||
| 			y := lp.FromInfluxMetric(f) | ||||
| 			if err == nil { | ||||
| 				output <- y | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -1,352 +0,0 @@ | ||||
| package sinks | ||||
|  | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"net" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" | ||||
| 	influx "github.com/influxdata/line-protocol/v2/lineprotocol" | ||||
| 	amqp "github.com/rabbitmq/amqp091-go" | ||||
| 	"golang.org/x/exp/slices" | ||||
| ) | ||||
|  | ||||
| type AmqpSinkConfig struct { | ||||
| 	// defines JSON tags for 'type' and 'meta_as_tags' (string list) | ||||
| 	// See: metricSink.go | ||||
| 	defaultSinkConfig | ||||
| 	// Additional config options, for AmqpSink | ||||
| 	QueueName string `json:"queue_name"` | ||||
| 	// Maximum number of points sent to server in single request. | ||||
| 	// Default: 1000 | ||||
| 	BatchSize int `json:"batch_size,omitempty"` | ||||
|  | ||||
| 	// Time interval for delayed sending of metrics. | ||||
| 	// If the buffers are already filled before the end of this interval, | ||||
| 	// the metrics are sent without further delay. | ||||
| 	// Default: 1s | ||||
| 	FlushInterval string `json:"flush_delay,omitempty"` | ||||
| 	flushDelay    time.Duration | ||||
|  | ||||
| 	Hostname       string `json:"hostname"` | ||||
| 	Port           int    `json:"port"` | ||||
| 	PublishTimeout string `json:"publish_timeout,omitempty"` | ||||
| 	publishTimeout time.Duration | ||||
| 	Username       string `json:"username,omitempty"` | ||||
| 	Password       string `json:"password,omitempty"` | ||||
| } | ||||
|  | ||||
| type AmqpSink struct { | ||||
| 	// declares elements 	'name' and 'meta_as_tags' (string to bool map!) | ||||
| 	sink | ||||
| 	config AmqpSinkConfig // entry point to the AmqpSinkConfig | ||||
| 	// influx line protocol encoder | ||||
| 	encoder influx.Encoder | ||||
| 	// number of records stored in the encoder | ||||
| 	numRecordsInEncoder int | ||||
| 	// List of tags and meta data tags which should be used as tags | ||||
| 	extended_tag_list []key_value_pair | ||||
| 	// Flush() runs in another goroutine and accesses the influx line protocol encoder, | ||||
| 	// so this encoderLock has to protect the encoder and numRecordsInEncoder | ||||
| 	encoderLock sync.Mutex | ||||
|  | ||||
| 	// timer to run Flush() | ||||
| 	flushTimer *time.Timer | ||||
| 	// Lock to assure that only one timer is running at a time | ||||
| 	timerLock sync.Mutex | ||||
|  | ||||
| 	// WaitGroup to ensure only one send operation is running at a time | ||||
| 	sendWaitGroup sync.WaitGroup | ||||
|  | ||||
| 	client  *amqp.Connection | ||||
| 	channel *amqp.Channel | ||||
| 	queue   amqp.Queue | ||||
| } | ||||
|  | ||||
| // Implement functions required for Sink interface | ||||
| // Write(...), Flush(), Close() | ||||
| // See: metricSink.go | ||||
|  | ||||
| // Code to submit a single CCMetric to the sink | ||||
| func (s *AmqpSink) Write(m lp.CCMetric) error { | ||||
|  | ||||
| 	// Lock for encoder usage | ||||
| 	s.encoderLock.Lock() | ||||
|  | ||||
| 	// Encode measurement name | ||||
| 	s.encoder.StartLine(m.Name()) | ||||
|  | ||||
| 	// copy tags and meta data which should be used as tags | ||||
| 	s.extended_tag_list = s.extended_tag_list[:0] | ||||
| 	for key, value := range m.Tags() { | ||||
| 		s.extended_tag_list = | ||||
| 			append( | ||||
| 				s.extended_tag_list, | ||||
| 				key_value_pair{ | ||||
| 					key:   key, | ||||
| 					value: value, | ||||
| 				}, | ||||
| 			) | ||||
| 	} | ||||
| 	for _, key := range s.config.MetaAsTags { | ||||
| 		if value, ok := m.GetMeta(key); ok { | ||||
| 			s.extended_tag_list = | ||||
| 				append( | ||||
| 					s.extended_tag_list, | ||||
| 					key_value_pair{ | ||||
| 						key:   key, | ||||
| 						value: value, | ||||
| 					}, | ||||
| 				) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Encode tags (they musts be in lexical order) | ||||
| 	slices.SortFunc( | ||||
| 		s.extended_tag_list, | ||||
| 		func(a key_value_pair, b key_value_pair) int { | ||||
| 			if a.key < b.key { | ||||
| 				return -1 | ||||
| 			} | ||||
| 			if a.key > b.key { | ||||
| 				return +1 | ||||
| 			} | ||||
| 			return 0 | ||||
| 		}, | ||||
| 	) | ||||
| 	for i := range s.extended_tag_list { | ||||
| 		s.encoder.AddTag( | ||||
| 			s.extended_tag_list[i].key, | ||||
| 			s.extended_tag_list[i].value, | ||||
| 		) | ||||
| 	} | ||||
|  | ||||
| 	// Encode fields | ||||
| 	for key, value := range m.Fields() { | ||||
| 		s.encoder.AddField(key, influx.MustNewValue(value)) | ||||
| 	} | ||||
|  | ||||
| 	// Encode time stamp | ||||
| 	s.encoder.EndLine(m.Time()) | ||||
|  | ||||
| 	// Check for encoder errors | ||||
| 	if err := s.encoder.Err(); err != nil { | ||||
| 		// Unlock encoder usage | ||||
| 		s.encoderLock.Unlock() | ||||
|  | ||||
| 		return fmt.Errorf("encoding failed: %v", err) | ||||
| 	} | ||||
| 	s.numRecordsInEncoder++ | ||||
|  | ||||
| 	if s.config.flushDelay == 0 { | ||||
| 		// Unlock encoder usage | ||||
| 		s.encoderLock.Unlock() | ||||
|  | ||||
| 		// Directly flush if no flush delay is configured | ||||
| 		return s.Flush() | ||||
| 	} else if s.numRecordsInEncoder == s.config.BatchSize { | ||||
| 		// Unlock encoder usage | ||||
| 		s.encoderLock.Unlock() | ||||
|  | ||||
| 		// Stop flush timer | ||||
| 		if s.flushTimer != nil { | ||||
| 			if ok := s.flushTimer.Stop(); ok { | ||||
| 				cclog.ComponentDebug(s.name, "Write(): Stopped flush timer. Batch size limit reached before flush delay") | ||||
| 				s.timerLock.Unlock() | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// Flush if batch size is reached | ||||
| 		return s.Flush() | ||||
| 	} else if s.timerLock.TryLock() { | ||||
|  | ||||
| 		// Setup flush timer when flush delay is configured | ||||
| 		// and no other timer is already running | ||||
| 		if s.flushTimer != nil { | ||||
|  | ||||
| 			// Restarting existing flush timer | ||||
| 			cclog.ComponentDebug(s.name, "Write(): Restarting flush timer") | ||||
| 			s.flushTimer.Reset(s.config.flushDelay) | ||||
| 		} else { | ||||
|  | ||||
| 			// Creating and starting flush timer | ||||
| 			cclog.ComponentDebug(s.name, "Write(): Starting new flush timer") | ||||
| 			s.flushTimer = time.AfterFunc( | ||||
| 				s.config.flushDelay, | ||||
| 				func() { | ||||
| 					defer s.timerLock.Unlock() | ||||
| 					cclog.ComponentDebug(s.name, "Starting flush triggered by flush timer") | ||||
| 					if err := s.Flush(); err != nil { | ||||
| 						cclog.ComponentError(s.name, "Flush triggered by flush timer: flush failed:", err) | ||||
| 					} | ||||
| 				}) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Unlock encoder usage | ||||
| 	s.encoderLock.Unlock() | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // If the sink uses batched sends internally, you can tell to flush its buffers | ||||
| func (s *AmqpSink) Flush() error { | ||||
|  | ||||
| 	// Lock for encoder usage | ||||
| 	// Own lock for as short as possible: the time it takes to clone the buffer. | ||||
| 	s.encoderLock.Lock() | ||||
|  | ||||
| 	buf := slices.Clone(s.encoder.Bytes()) | ||||
| 	numRecordsInBuf := s.numRecordsInEncoder | ||||
| 	s.encoder.Reset() | ||||
| 	s.numRecordsInEncoder = 0 | ||||
|  | ||||
| 	// Unlock encoder usage | ||||
| 	s.encoderLock.Unlock() | ||||
|  | ||||
| 	if len(buf) == 0 { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	cclog.ComponentDebug(s.name, "Flush(): Flushing", numRecordsInBuf, "metrics") | ||||
|  | ||||
| 	// Asynchron send of encoder metrics | ||||
| 	s.sendWaitGroup.Add(1) | ||||
| 	go func() { | ||||
| 		defer s.sendWaitGroup.Done() | ||||
| 		//startTime := time.Now() | ||||
| 		ctx, cancel := context.WithTimeout(context.Background(), s.config.flushDelay) | ||||
| 		defer cancel() | ||||
| 		err := s.channel.PublishWithContext(ctx, "", s.queue.Name, false, false, amqp.Publishing{ | ||||
| 			ContentType: "text/plain", | ||||
| 			Body:        buf, | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			cclog.ComponentError(s.name, err.Error()) | ||||
| 		} | ||||
| 	}() | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Close sink: close network connection, close files, close libraries, ... | ||||
| func (s *AmqpSink) Close() { | ||||
|  | ||||
| 	cclog.ComponentDebug(s.name, "CLOSE") | ||||
|  | ||||
| 	// Stop existing timer and immediately flush | ||||
| 	if s.flushTimer != nil { | ||||
| 		if ok := s.flushTimer.Stop(); ok { | ||||
| 			s.timerLock.Unlock() | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Flush | ||||
| 	if err := s.Flush(); err != nil { | ||||
| 		cclog.ComponentError(s.name, "Close():", "Flush failed:", err) | ||||
| 	} | ||||
|  | ||||
| 	// Wait for send operations to finish | ||||
| 	s.sendWaitGroup.Wait() | ||||
|  | ||||
| 	s.client.Close() | ||||
| 	s.client = nil | ||||
| } | ||||
|  | ||||
| // New function to create a new instance of the sink | ||||
| // Initialize the sink by giving it a name and reading in the config JSON | ||||
| func NewAmqpSink(name string, config json.RawMessage) (Sink, error) { | ||||
| 	s := new(AmqpSink) | ||||
|  | ||||
| 	// Set name of sampleSink | ||||
| 	// The name should be chosen in such a way that different instances of AmqpSink can be distinguished | ||||
| 	s.name = fmt.Sprintf("AmqpSink(%s)", name) // Always specify a name here | ||||
|  | ||||
| 	// Set defaults in s.config | ||||
| 	// Allow overwriting these defaults by reading config JSON | ||||
|  | ||||
| 	s.config.PublishTimeout = "4s" | ||||
| 	s.config.publishTimeout = time.Duration(4) * time.Second | ||||
| 	s.config.Hostname = "localhost" | ||||
| 	s.config.Port = 1883 | ||||
|  | ||||
| 	// Read in the config JSON | ||||
| 	if len(config) > 0 { | ||||
| 		d := json.NewDecoder(bytes.NewReader(config)) | ||||
| 		d.DisallowUnknownFields() | ||||
| 		if err := d.Decode(&s.config); err != nil { | ||||
| 			cclog.ComponentError(s.name, "Error reading config:", err.Error()) | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Create lookup map to use meta infos as tags in the output metric | ||||
| 	s.meta_as_tags = make(map[string]bool) | ||||
| 	for _, k := range s.config.MetaAsTags { | ||||
| 		s.meta_as_tags[k] = true | ||||
| 	} | ||||
|  | ||||
| 	// Check if all required fields in the config are set | ||||
| 	// E.g. use 'len(s.config.Option) > 0' for string settings | ||||
| 	if t, err := time.ParseDuration(s.config.PublishTimeout); err == nil { | ||||
| 		s.config.publishTimeout = t | ||||
| 	} else { | ||||
| 		err := fmt.Errorf("to parse duration for PublishTimeout: %s", s.config.PublishTimeout) | ||||
| 		cclog.ComponentError(s.name, err.Error()) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	if t, err := time.ParseDuration(s.config.FlushInterval); err == nil { | ||||
| 		s.config.flushDelay = t | ||||
| 	} else { | ||||
| 		err := fmt.Errorf("to parse duration for FlushInterval: %s", s.config.FlushInterval) | ||||
| 		cclog.ComponentError(s.name, err.Error()) | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	url := net.JoinHostPort(s.config.Hostname, fmt.Sprintf("%d", s.config.Port)) | ||||
| 	userpart := "" | ||||
| 	if len(s.config.Username) > 0 { | ||||
| 		userpart = s.config.Username | ||||
| 		if len(s.config.Password) > 0 { | ||||
| 			userpart += ":" + s.config.Password | ||||
| 		} | ||||
| 		userpart += "@" | ||||
| 	} | ||||
| 	url = fmt.Sprintf("amqp://%s%s", userpart, url) | ||||
|  | ||||
| 	// Establish connection to the server, library, ... | ||||
| 	// Check required files exist and lookup path(s) of executable(s) | ||||
| 	c, err := amqp.Dial(url) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	s.client = c | ||||
|  | ||||
| 	ch, err := c.Channel() | ||||
| 	if err != nil { | ||||
| 		s.client.Close() | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	s.channel = ch | ||||
|  | ||||
| 	q, err := ch.QueueDeclare( | ||||
| 		s.config.QueueName, // name | ||||
| 		false,              // durable | ||||
| 		false,              // delete when unused | ||||
| 		false,              // exclusive | ||||
| 		false,              // no-wait | ||||
| 		nil,                // arguments | ||||
| 	) | ||||
| 	if err != nil { | ||||
| 		s.channel.Close() | ||||
| 		s.client.Close() | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	s.queue = q | ||||
|  | ||||
| 	// Return (nil, meaningful error message) in case of errors | ||||
| 	return s, nil | ||||
| } | ||||
| @@ -1,33 +0,0 @@ | ||||
| ## `amqp` sink | ||||
|  | ||||
| The `amqp` sink publishes all metrics into a RabbitMQ network. The publishing key is the queue name in the configuration file | ||||
|  | ||||
| ### Configuration structure | ||||
|  | ||||
| ```json | ||||
| { | ||||
|   "<name>": { | ||||
|     "type": "amqp", | ||||
|     "queue_name" : "myqueue", | ||||
|     "batch_size" : 1000, | ||||
|     "flush_delay": "4s", | ||||
|     "publish_timeout": "1s", | ||||
|     "host": "dbhost.example.com", | ||||
|     "port": 5672, | ||||
|     "username": "exampleuser", | ||||
|     "password" : "examplepw", | ||||
|     "meta_as_tags" : [], | ||||
|   } | ||||
| } | ||||
| ``` | ||||
|  | ||||
| - `type`: makes the sink an `amqp` sink, also `rabbitmq` is allowed as alias | ||||
| - `queue_name`: All metrics are published to this queue | ||||
| - `host`: Hostname of the RabbitMQ server | ||||
| - `port`: Port number of the RabbitMQ server | ||||
| - `username`: Username for basic authentication | ||||
| - `password`: Password for basic authentication | ||||
| - `meta_as_tags`: print all meta information as tags in the output (optional) | ||||
| - `publish_timeout`: Timeout for each publication operation (default `1s`) | ||||
| - `flush_delay`: Group metrics coming in to a single batch (default `4s`) | ||||
| - `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay (default: `1000`) | ||||
| @@ -21,8 +21,6 @@ var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink, | ||||
| 	"influxdb":    NewInfluxSink, | ||||
| 	"influxasync": NewInfluxAsyncSink, | ||||
| 	"http":        NewHttpSink, | ||||
| 	"amqp":        NewAmqpSink, | ||||
| 	"rabbitmq":    NewAmqpSink, | ||||
| } | ||||
|  | ||||
| // Metric collector manager data structure | ||||
|   | ||||
							
								
								
									
										159
									
								
								sinks/syslogSink.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										159
									
								
								sinks/syslogSink.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,159 @@ | ||||
| package sinks | ||||
|  | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"log/syslog" | ||||
|  | ||||
| 	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" | ||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" | ||||
| ) | ||||
|  | ||||
| var AvailableSyslogPriorities map[string]syslog.Priority = map[string]syslog.Priority{ | ||||
| 	"LOG_EMERG":    syslog.LOG_EMERG, | ||||
| 	"LOG_ALERT":    syslog.LOG_ALERT, | ||||
| 	"LOG_CRIT":     syslog.LOG_CRIT, | ||||
| 	"LOG_ERR":      syslog.LOG_ERR, | ||||
| 	"LOG_NOTICE":   syslog.LOG_NOTICE, | ||||
| 	"LOG_INFO":     syslog.LOG_INFO, | ||||
| 	"LOG_DEBUG":    syslog.LOG_DEBUG, | ||||
| 	"LOG_USER":     syslog.LOG_USER, | ||||
| 	"LOG_MAIL":     syslog.LOG_MAIL, | ||||
| 	"LOG_DAEMON":   syslog.LOG_DAEMON, | ||||
| 	"LOG_AUTH":     syslog.LOG_AUTH, | ||||
| 	"LOG_SYSLOG":   syslog.LOG_SYSLOG, | ||||
| 	"LOG_LPR":      syslog.LOG_LPR, | ||||
| 	"LOG_NEWS":     syslog.LOG_NEWS, | ||||
| 	"LOG_UUCP":     syslog.LOG_UUCP, | ||||
| 	"LOG_CRON":     syslog.LOG_CRON, | ||||
| 	"LOG_AUTHPRIV": syslog.LOG_AUTHPRIV, | ||||
| 	"LOG_FTP":      syslog.LOG_FTP, | ||||
| 	"LOG_LOCAL0":   syslog.LOG_LOCAL0, | ||||
| 	"LOG_LOCAL1":   syslog.LOG_LOCAL1, | ||||
| 	"LOG_LOCAL2":   syslog.LOG_LOCAL2, | ||||
| 	"LOG_LOCAL3":   syslog.LOG_LOCAL3, | ||||
| 	"LOG_LOCAL4":   syslog.LOG_LOCAL4, | ||||
| 	"LOG_LOCAL5":   syslog.LOG_LOCAL5, | ||||
| 	"LOG_LOCAL6":   syslog.LOG_LOCAL6, | ||||
| 	"LOG_LOCAL7":   syslog.LOG_LOCAL7, | ||||
| } | ||||
|  | ||||
| type SyslogSinkConfig struct { | ||||
| 	defaultSinkConfig | ||||
| 	SyslogTag      string   `json:"syslog_tag"` | ||||
| 	SyslogPriories []string `json:"syslog_priorities"` | ||||
| 	SyslogWriter   string   `json:"syslog_writer"` | ||||
| } | ||||
|  | ||||
| type SyslogSink struct { | ||||
| 	sink | ||||
| 	config      SyslogSinkConfig // entry point to the SyslogSinkConfig | ||||
| 	syslogPrios syslog.Priority | ||||
| 	out         *syslog.Writer | ||||
| } | ||||
|  | ||||
| func (s *SyslogSink) Write(point lp.CCMetric) error { | ||||
| 	var err error = nil | ||||
| 	p := point.ToLineProtocol(s.meta_as_tags) | ||||
| 	switch s.config.SyslogWriter { | ||||
| 	case "info": | ||||
| 		err = s.out.Info(p) | ||||
| 	case "debug": | ||||
| 		err = s.out.Debug(p) | ||||
| 	case "alert": | ||||
| 		err = s.out.Alert(p) | ||||
| 	case "emerg": | ||||
| 		err = s.out.Emerg(p) | ||||
| 	case "err": | ||||
| 		err = s.out.Err(p) | ||||
| 	case "notice": | ||||
| 		err = s.out.Notice(p) | ||||
| 	case "warning": | ||||
| 		err = s.out.Warning(p) | ||||
| 	} | ||||
|  | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| func (s *SyslogSink) Flush() error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (s *SyslogSink) Close() { | ||||
| 	s.out.Close() | ||||
| 	cclog.ComponentDebug(s.name, "CLOSE") | ||||
| } | ||||
|  | ||||
| func NewSyslogSink(name string, config json.RawMessage) (Sink, error) { | ||||
| 	var err error = nil | ||||
| 	s := new(SyslogSink) | ||||
|  | ||||
| 	s.name = fmt.Sprintf("SyslogSink(%s)", name) | ||||
|  | ||||
| 	// Read in the config JSON | ||||
| 	if len(config) > 0 { | ||||
| 		d := json.NewDecoder(bytes.NewReader(config)) | ||||
| 		d.DisallowUnknownFields() | ||||
| 		if err := d.Decode(&s.config); err != nil { | ||||
| 			cclog.ComponentError(s.name, "Error reading config:", err.Error()) | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Create lookup map to use meta infos as tags in the output metric | ||||
| 	s.meta_as_tags = make(map[string]bool) | ||||
| 	for _, k := range s.config.MetaAsTags { | ||||
| 		s.meta_as_tags[k] = true | ||||
| 	} | ||||
|  | ||||
| 	if len(s.config.SyslogTag) == 0 { | ||||
| 		err := errors.New("syslog tag must be non-empty") | ||||
| 		cclog.ComponentError(s.name, err.Error()) | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	if len(s.config.SyslogPriories) == 0 { | ||||
| 		err := errors.New("syslog prioritiey must be non-empty") | ||||
| 		cclog.ComponentError(s.name, err.Error()) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	for _, p := range s.config.SyslogPriories { | ||||
| 		if prio, ok := AvailableSyslogPriorities[p]; !ok { | ||||
| 			err := fmt.Errorf("invalid syslog priority '%s'", p) | ||||
| 			cclog.ComponentError(s.name, err.Error()) | ||||
| 			return nil, err | ||||
| 		} else { | ||||
| 			s.syslogPrios |= prio | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	mywriter := "" | ||||
| 	for _, w := range []string{ | ||||
| 		"info", | ||||
| 		"debug", | ||||
| 		"alert", | ||||
| 		"emerg", | ||||
| 		"err", | ||||
| 		"notice", | ||||
| 		"warning", | ||||
| 	} { | ||||
| 		if s.config.SyslogWriter == w { | ||||
| 			mywriter = w | ||||
| 		} | ||||
| 	} | ||||
| 	if len(mywriter) == 0 { | ||||
| 		err := fmt.Errorf("invalid syslog writer '%s'", s.config.SyslogWriter) | ||||
| 		cclog.ComponentError(s.name, err.Error()) | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	s.out, err = syslog.New(s.syslogPrios, s.config.SyslogTag) | ||||
| 	if err != nil { | ||||
| 		cclog.ComponentError(s.name, err.Error()) | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	return s, nil | ||||
| } | ||||
							
								
								
									
										64
									
								
								sinks/syslogSink.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										64
									
								
								sinks/syslogSink.md
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,64 @@ | ||||
| ## `syslog` sink | ||||
|  | ||||
| The `syslog` sink provides an easy way to submit metrics and events to the Syslog logging system. | ||||
|  | ||||
| ### Configuration structure | ||||
|  | ||||
| ```json | ||||
| { | ||||
|   "<name>": { | ||||
|     "type": "syslog", | ||||
|     "meta_as_tags" : [], | ||||
|     "syslog_tag" : "mytag", | ||||
|     "syslog_priorities" : [ | ||||
|         "LOG_INFO", | ||||
|         "LOG_LOCAL7", | ||||
|     ], | ||||
|     "syslog_writer" : "info" | ||||
|   } | ||||
| } | ||||
| ``` | ||||
|  | ||||
| - `type`: makes the sink an `syslog` sink | ||||
| - `meta_as_tags`: print meta information as tags in the output (optional) | ||||
| - `syslog_tag`: The tag submitted with each message | ||||
| - `syslog_priorities`: List of syslog priorities. Will be OR'd together | ||||
| - `syslog_writer`: Submit metrics with this write level | ||||
|  | ||||
|  | ||||
| #### Possible priorities for `syslog_priorities` | ||||
| - `LOG_EMERG` | ||||
| - `LOG_ALERT` | ||||
| - `LOG_CRIT` | ||||
| - `LOG_ERR` | ||||
| - `LOG_NOTICE` | ||||
| - `LOG_INFO` | ||||
| - `LOG_DEBUG` | ||||
| - `LOG_USER` | ||||
| - `LOG_MAIL` | ||||
| - `LOG_DAEMON` | ||||
| - `LOG_AUTH` | ||||
| - `LOG_SYSLOG` | ||||
| - `LOG_LPR` | ||||
| - `LOG_NEWS` | ||||
| - `LOG_UUCP` | ||||
| - `LOG_CRON` | ||||
| - `LOG_AUTHPRIV` | ||||
| - `LOG_FTP` | ||||
| - `LOG_LOCAL0` | ||||
| - `LOG_LOCAL1` | ||||
| - `LOG_LOCAL2` | ||||
| - `LOG_LOCAL3` | ||||
| - `LOG_LOCAL4` | ||||
| - `LOG_LOCAL5` | ||||
| - `LOG_LOCAL6` | ||||
| - `LOG_LOCAL7` | ||||
|  | ||||
| #### Possible writers for `syslog_writer` | ||||
| - `info` | ||||
| - `debug` | ||||
| - `alert` | ||||
| - `emerg` | ||||
| - `err` | ||||
| - `notice` | ||||
| - `warning` | ||||
		Reference in New Issue
	
	Block a user