Merge pull request #41 from ClusterCockpit/new_sink_instances

New sink instances
This commit is contained in:
Holger Obermaier 2022-02-23 15:05:53 +01:00 committed by GitHub
commit 6843902909
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 292 additions and 273 deletions

View File

@ -6,6 +6,7 @@ This folder contains the SinkManager and sink implementations for the cc-metric-
- [`stdout`](./stdoutSink.md): Print all metrics to `stdout`, `stderr` or a file - [`stdout`](./stdoutSink.md): Print all metrics to `stdout`, `stderr` or a file
- [`http`](./httpSink.md): Send metrics to an HTTP server as POST requests - [`http`](./httpSink.md): Send metrics to an HTTP server as POST requests
- [`influxdb`](./influxSink.md): Send metrics to an [InfluxDB](https://www.influxdata.com/products/influxdb/) database - [`influxdb`](./influxSink.md): Send metrics to an [InfluxDB](https://www.influxdata.com/products/influxdb/) database
- [`influxasync`](./influxAsyncSink.md): Send metrics to an [InfluxDB](https://www.influxdata.com/products/influxdb/) database with non-blocking write API
- [`nats`](./natsSink.md): Publish metrics to the [NATS](https://nats.io/) network overlay system - [`nats`](./natsSink.md): Publish metrics to the [NATS](https://nats.io/) network overlay system
- [`ganglia`](./gangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) using the `gmetric` CLI tool - [`ganglia`](./gangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) using the `gmetric` CLI tool
- [`libganglia`](./libgangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) directly using `libganglia.so` - [`libganglia`](./libgangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) directly using `libganglia.so`
@ -34,11 +35,12 @@ The configuration file for the sinks is a list of configurations. The `type` fie
# Contributing own sinks # Contributing own sinks
A sink contains four functions and is derived from the type `sink`: A sink contains five functions and is derived from the type `sink`:
* `Init(config json.RawMessage) error` * `Init(name string, config json.RawMessage) error`
* `Write(point CCMetric) error` * `Write(point CCMetric) error`
* `Flush() error` * `Flush() error`
* `Close()` * `Close()`
* `New<Typename>(name string, config json.RawMessage) (Sink, error)` (calls the `Init()` function)
The data structures should be set up in `Init()` like opening a file or server connection. The `Write()` function writes/sends the data. For non-blocking sinks, the `Flush()` method tells the sink to drain its internal buffers. The `Close()` function should tear down anything created in `Init()`. The data structures should be set up in `Init()` like opening a file or server connection. The `Write()` function writes/sends the data. For non-blocking sinks, the `Flush()` method tells the sink to drain its internal buffers. The `Close()` function should tear down anything created in `Init()`.
@ -65,8 +67,8 @@ type SampleSink struct {
} }
// Initialize the sink by giving it a name and reading in the config JSON // Initialize the sink by giving it a name and reading in the config JSON
func (s *SampleSink) Init(config json.RawMessage) error { func (s *SampleSink) Init(name string, config json.RawMessage) error {
s.name = "SampleSink" // Always specify a name here s.name = fmt.Sprintf("SampleSink(%s)", name) // Always specify a name here
// Read in the config JSON // Read in the config JSON
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &s.config) err := json.Unmarshal(config, &s.config)
@ -91,4 +93,13 @@ func (s *SampleSink) Flush() error {
// Close sink: close network connection, close files, close libraries, ... // Close sink: close network connection, close files, close libraries, ...
func (s *SampleSink) Close() {} func (s *SampleSink) Close() {}
// New function to create a new instance of the sink
func NewSampleSink(name string, config json.RawMessage) (Sink, error) {
s := new(SampleSink)
err := s.Init(name, config)
return s, err
}
``` ```

View File

@ -33,41 +33,6 @@ type GangliaSink struct {
config GangliaSinkConfig config GangliaSinkConfig
} }
func (s *GangliaSink) Init(config json.RawMessage) error {
var err error = nil
s.name = "GangliaSink"
s.config.AddTagsAsDesc = false
s.config.AddGangliaGroup = false
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
return err
}
}
s.gmetric_path = ""
s.gmetric_config = ""
if len(s.config.GmetricPath) > 0 {
p, err := exec.LookPath(s.config.GmetricPath)
if err == nil {
s.gmetric_path = p
}
}
if len(s.gmetric_path) == 0 {
p, err := exec.LookPath(string(GMETRIC_EXEC))
if err == nil {
s.gmetric_path = p
}
}
if len(s.gmetric_path) == 0 {
err = errors.New("cannot find executable 'gmetric'")
}
if len(s.config.GmetricConfig) > 0 {
s.gmetric_config = s.config.GmetricConfig
}
return err
}
func (s *GangliaSink) Write(point lp.CCMetric) error { func (s *GangliaSink) Write(point lp.CCMetric) error {
var err error = nil var err error = nil
var tagsstr []string var tagsstr []string
@ -168,4 +133,37 @@ func (s *GangliaSink) Flush() error {
func (s *GangliaSink) Close() { func (s *GangliaSink) Close() {
} }
func NewGangliaSink() func NewGangliaSink(name string, config json.RawMessage) (Sink, error) {
s := new(GangliaSink)
s.name = fmt.Sprintf("GangliaSink(%s)", name)
s.config.AddTagsAsDesc = false
s.config.AddGangliaGroup = false
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
return nil, err
}
}
s.gmetric_path = ""
s.gmetric_config = ""
if len(s.config.GmetricPath) > 0 {
p, err := exec.LookPath(s.config.GmetricPath)
if err == nil {
s.gmetric_path = p
}
}
if len(s.gmetric_path) == 0 {
p, err := exec.LookPath(string(GMETRIC_EXEC))
if err == nil {
s.gmetric_path = p
}
}
if len(s.gmetric_path) == 0 {
return nil, errors.New("cannot find executable 'gmetric'")
}
if len(s.config.GmetricConfig) > 0 {
s.gmetric_config = s.config.GmetricConfig
}
return s, nil
}

View File

@ -38,57 +38,6 @@ type HttpSink struct {
flushDelay time.Duration flushDelay time.Duration
} }
func (s *HttpSink) Init(config json.RawMessage) error {
// Set default values
s.name = "HttpSink"
s.config.MaxIdleConns = 10
s.config.IdleConnTimeout = "5s"
s.config.Timeout = "5s"
s.config.FlushDelay = "1s"
// Read config
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return err
}
}
if len(s.config.URL) == 0 {
return errors.New("`url` config option is required for HTTP sink")
}
if s.config.MaxIdleConns > 0 {
s.maxIdleConns = s.config.MaxIdleConns
}
if len(s.config.IdleConnTimeout) > 0 {
t, err := time.ParseDuration(s.config.IdleConnTimeout)
if err == nil {
s.idleConnTimeout = t
}
}
if len(s.config.Timeout) > 0 {
t, err := time.ParseDuration(s.config.Timeout)
if err == nil {
s.timeout = t
}
}
if len(s.config.FlushDelay) > 0 {
t, err := time.ParseDuration(s.config.FlushDelay)
if err == nil {
s.flushDelay = t
}
}
tr := &http.Transport{
MaxIdleConns: s.maxIdleConns,
IdleConnTimeout: s.idleConnTimeout,
}
s.client = &http.Client{Transport: tr, Timeout: s.timeout}
s.buffer = &bytes.Buffer{}
s.encoder = influx.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second)
return nil
}
func (s *HttpSink) Write(m lp.CCMetric) error { func (s *HttpSink) Write(m lp.CCMetric) error {
if s.buffer.Len() == 0 && s.flushDelay != 0 { if s.buffer.Len() == 0 && s.flushDelay != 0 {
// This is the first write since the last flush, start the flushTimer! // This is the first write since the last flush, start the flushTimer!
@ -169,3 +118,54 @@ func (s *HttpSink) Close() {
} }
s.client.CloseIdleConnections() s.client.CloseIdleConnections()
} }
func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
s := new(HttpSink)
// Set default values
s.name = fmt.Sprintf("HttpSink(%s)", name)
s.config.MaxIdleConns = 10
s.config.IdleConnTimeout = "5s"
s.config.Timeout = "5s"
s.config.FlushDelay = "1s"
// Read config
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return nil, err
}
}
if len(s.config.URL) == 0 {
return nil, errors.New("`url` config option is required for HTTP sink")
}
if s.config.MaxIdleConns > 0 {
s.maxIdleConns = s.config.MaxIdleConns
}
if len(s.config.IdleConnTimeout) > 0 {
t, err := time.ParseDuration(s.config.IdleConnTimeout)
if err == nil {
s.idleConnTimeout = t
}
}
if len(s.config.Timeout) > 0 {
t, err := time.ParseDuration(s.config.Timeout)
if err == nil {
s.timeout = t
}
}
if len(s.config.FlushDelay) > 0 {
t, err := time.ParseDuration(s.config.FlushDelay)
if err == nil {
s.flushDelay = t
}
}
tr := &http.Transport{
MaxIdleConns: s.maxIdleConns,
IdleConnTimeout: s.idleConnTimeout,
}
s.client = &http.Client{Transport: tr, Timeout: s.timeout}
s.buffer = &bytes.Buffer{}
s.encoder = influx.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second)
return s, nil
}

View File

@ -30,11 +30,10 @@ type InfluxAsyncSinkConfig struct {
type InfluxAsyncSink struct { type InfluxAsyncSink struct {
sink sink
client influxdb2.Client client influxdb2.Client
writeApi influxdb2Api.WriteAPI writeApi influxdb2Api.WriteAPI
retPolicy string errors <-chan error
errors <-chan error config InfluxAsyncSinkConfig
config InfluxAsyncSinkConfig
} }
func (s *InfluxAsyncSink) connect() error { func (s *InfluxAsyncSink) connect() error {
@ -68,39 +67,6 @@ func (s *InfluxAsyncSink) connect() error {
return nil return nil
} }
func (s *InfluxAsyncSink) Init(config json.RawMessage) error {
s.name = "InfluxSink"
// Set default for maximum number of points sent to server in single request.
s.config.BatchSize = 100
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 ||
len(s.config.Organization) == 0 ||
len(s.config.Password) == 0 {
return errors.New("not all configuration variables set required by InfluxAsyncSink")
}
// Connect to InfluxDB server
err := s.connect()
// Start background: Read from error channel
s.errors = s.writeApi.Errors()
go func() {
for err := range s.errors {
cclog.ComponentError(s.name, err.Error())
}
}()
return err
}
func (s *InfluxAsyncSink) Write(m lp.CCMetric) error { func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
s.writeApi.WritePoint( s.writeApi.WritePoint(
m.ToPoint(s.config.MetaAsTags), m.ToPoint(s.config.MetaAsTags),
@ -118,3 +84,40 @@ func (s *InfluxAsyncSink) Close() {
s.writeApi.Flush() s.writeApi.Flush()
s.client.Close() s.client.Close()
} }
func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
s := new(InfluxAsyncSink)
s.name = fmt.Sprintf("InfluxSink(%s)", name)
// Set default for maximum number of points sent to server in single request.
s.config.BatchSize = 100
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return nil, err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 ||
len(s.config.Organization) == 0 ||
len(s.config.Password) == 0 {
return nil, errors.New("not all configuration variables set required by InfluxAsyncSink")
}
// Connect to InfluxDB server
if err := s.connect(); err != nil {
return nil, fmt.Errorf("Unable to connect: %v", err)
}
// Start background: Read from error channel
s.errors = s.writeApi.Errors()
go func() {
for err := range s.errors {
cclog.ComponentError(s.name, err.Error())
}
}()
return s, nil
}

View File

@ -57,26 +57,6 @@ func (s *InfluxSink) connect() error {
return nil return nil
} }
func (s *InfluxSink) Init(config json.RawMessage) error {
s.name = "InfluxSink"
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 ||
len(s.config.Organization) == 0 ||
len(s.config.Password) == 0 {
return errors.New("not all configuration variables set required by InfluxSink")
}
// Connect to InfluxDB server
return s.connect()
}
func (s *InfluxSink) Write(m lp.CCMetric) error { func (s *InfluxSink) Write(m lp.CCMetric) error {
err := err :=
s.writeApi.WritePoint( s.writeApi.WritePoint(
@ -94,3 +74,27 @@ func (s *InfluxSink) Close() {
cclog.ComponentDebug(s.name, "Closing InfluxDB connection") cclog.ComponentDebug(s.name, "Closing InfluxDB connection")
s.client.Close() s.client.Close()
} }
func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
s := new(InfluxSink)
s.name = fmt.Sprintf("InfluxSink(%s)", name)
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return nil, err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 ||
len(s.config.Organization) == 0 ||
len(s.config.Password) == 0 {
return nil, errors.New("not all configuration variables set required by InfluxSink")
}
// Connect to InfluxDB server
if err := s.connect(); err != nil {
return nil, fmt.Errorf("Unable to connect: %v", err)
}
return s, nil
}

View File

@ -109,65 +109,6 @@ type LibgangliaSink struct {
cstrCache map[string]*C.char cstrCache map[string]*C.char
} }
func (s *LibgangliaSink) Init(config json.RawMessage) error {
var err error = nil
s.name = "LibgangliaSink"
//s.config.AddTagsAsDesc = false
s.config.AddGangliaGroup = false
s.config.AddTypeToName = false
s.config.AddUnits = true
s.config.GmondConfig = string(GMOND_CONFIG_FILE)
s.config.GangliaLib = string(GANGLIA_LIB_NAME)
if len(config) > 0 {
err = json.Unmarshal(config, &s.config)
if err != nil {
cclog.ComponentError(s.name, "Error reading config:", err.Error())
return err
}
}
lib := dl.New(s.config.GangliaLib, GANGLIA_LIB_DL_FLAGS)
if lib == nil {
return fmt.Errorf("error instantiating DynamicLibrary for %s", s.config.GangliaLib)
}
err = lib.Open()
if err != nil {
return fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err)
}
// Set up cache for the C strings
s.cstrCache = make(map[string]*C.char)
// s.cstrCache["globals"] = C.CString("globals")
// s.cstrCache["override_hostname"] = C.CString("override_hostname")
// s.cstrCache["override_ip"] = C.CString("override_ip")
// Add some constant strings
s.cstrCache["GROUP"] = C.CString("GROUP")
s.cstrCache["CLUSTER"] = C.CString("CLUSTER")
s.cstrCache[""] = C.CString("")
// Add cluster name for lookup in Write()
if len(s.config.ClusterName) > 0 {
s.cstrCache[s.config.ClusterName] = C.CString(s.config.ClusterName)
}
// Add supported types for later lookup in Write()
s.cstrCache["double"] = C.CString("double")
s.cstrCache["int32"] = C.CString("int32")
s.cstrCache["string"] = C.CString("string")
// Create Ganglia pool
s.global_context = C.Ganglia_pool_create(nil)
// Load Ganglia configuration
s.cstrCache[s.config.GmondConfig] = C.CString(s.config.GmondConfig)
s.gmond_config = C.Ganglia_gmond_config_create(s.cstrCache[s.config.GmondConfig], 0)
//globals := C.cfg_getsec(gmond_config, s.cstrCache["globals"])
//override_hostname := C.cfg_getstr(globals, s.cstrCache["override_hostname"])
//override_ip := C.cfg_getstr(globals, s.cstrCache["override_ip"])
s.send_channels = C.Ganglia_udp_send_channels_create(s.global_context, s.gmond_config)
return nil
}
func (s *LibgangliaSink) Write(point lp.CCMetric) error { func (s *LibgangliaSink) Write(point lp.CCMetric) error {
var err error = nil var err error = nil
var c_name *C.char var c_name *C.char
@ -316,3 +257,63 @@ func (s *LibgangliaSink) Close() {
C.free(unsafe.Pointer(cstr)) C.free(unsafe.Pointer(cstr))
} }
} }
func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) {
s := new(LibgangliaSink)
var err error = nil
s.name = fmt.Sprintf("LibgangliaSink(%s)", name)
//s.config.AddTagsAsDesc = false
s.config.AddGangliaGroup = false
s.config.AddTypeToName = false
s.config.AddUnits = true
s.config.GmondConfig = string(GMOND_CONFIG_FILE)
s.config.GangliaLib = string(GANGLIA_LIB_NAME)
if len(config) > 0 {
err = json.Unmarshal(config, &s.config)
if err != nil {
cclog.ComponentError(s.name, "Error reading config:", err.Error())
return nil, err
}
}
lib := dl.New(s.config.GangliaLib, GANGLIA_LIB_DL_FLAGS)
if lib == nil {
return nil, fmt.Errorf("error instantiating DynamicLibrary for %s", s.config.GangliaLib)
}
err = lib.Open()
if err != nil {
return nil, fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err)
}
// Set up cache for the C strings
s.cstrCache = make(map[string]*C.char)
// s.cstrCache["globals"] = C.CString("globals")
// s.cstrCache["override_hostname"] = C.CString("override_hostname")
// s.cstrCache["override_ip"] = C.CString("override_ip")
// Add some constant strings
s.cstrCache["GROUP"] = C.CString("GROUP")
s.cstrCache["CLUSTER"] = C.CString("CLUSTER")
s.cstrCache[""] = C.CString("")
// Add cluster name for lookup in Write()
if len(s.config.ClusterName) > 0 {
s.cstrCache[s.config.ClusterName] = C.CString(s.config.ClusterName)
}
// Add supported types for later lookup in Write()
s.cstrCache["double"] = C.CString("double")
s.cstrCache["int32"] = C.CString("int32")
s.cstrCache["string"] = C.CString("string")
// Create Ganglia pool
s.global_context = C.Ganglia_pool_create(nil)
// Load Ganglia configuration
s.cstrCache[s.config.GmondConfig] = C.CString(s.config.GmondConfig)
s.gmond_config = C.Ganglia_gmond_config_create(s.cstrCache[s.config.GmondConfig], 0)
//globals := C.cfg_getsec(gmond_config, s.cstrCache["globals"])
//override_hostname := C.cfg_getstr(globals, s.cstrCache["override_hostname"])
//override_ip := C.cfg_getstr(globals, s.cstrCache["override_ip"])
s.send_channels = C.Ganglia_udp_send_channels_create(s.global_context, s.gmond_config)
return s, nil
}

View File

@ -1,8 +1,6 @@
package sinks package sinks
import ( import (
"encoding/json"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
) )
@ -17,7 +15,6 @@ type sink struct {
} }
type Sink interface { type Sink interface {
Init(config json.RawMessage) error
Write(point lp.CCMetric) error Write(point lp.CCMetric) error
Flush() error Flush() error
Close() Close()

View File

@ -53,30 +53,6 @@ func (s *NatsSink) connect() error {
return nil return nil
} }
func (s *NatsSink) Init(config json.RawMessage) error {
s.name = "NatsSink"
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
return err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 {
return errors.New("not all configuration variables set required by NatsSink")
}
// Setup Influx line protocol
s.buffer = &bytes.Buffer{}
s.buffer.Grow(1025)
s.encoder = influx.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second)
s.encoder.SetMaxLineBytes(1024)
// Setup infos for connection
return s.connect()
}
func (s *NatsSink) Write(m lp.CCMetric) error { func (s *NatsSink) Write(m lp.CCMetric) error {
if s.client != nil { if s.client != nil {
_, err := s.encoder.Encode(m.ToPoint(s.config.MetaAsTags)) _, err := s.encoder.Encode(m.ToPoint(s.config.MetaAsTags))
@ -105,3 +81,31 @@ func (s *NatsSink) Close() {
s.client.Close() s.client.Close()
} }
} }
func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
s := new(NatsSink)
s.name = fmt.Sprintf("NatsSink(%s)", name)
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
return nil, err
}
}
if len(s.config.Host) == 0 ||
len(s.config.Port) == 0 ||
len(s.config.Database) == 0 {
return nil, errors.New("not all configuration variables set required by NatsSink")
}
// Setup Influx line protocol
s.buffer = &bytes.Buffer{}
s.buffer.Grow(1025)
s.encoder = influx.NewEncoder(s.buffer)
s.encoder.SetPrecision(time.Second)
s.encoder.SetMaxLineBytes(1024)
// Setup infos for connection
if err := s.connect(); err != nil {
return nil, fmt.Errorf("Unable to connect: %v", err)
}
return s, nil
}

View File

@ -13,14 +13,14 @@ import (
const SINK_MAX_FORWARD = 50 const SINK_MAX_FORWARD = 50
// Map of all available sinks // Map of all available sinks
var AvailableSinks = map[string]Sink{ var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink, error){
"influxdb": new(InfluxSink), "ganglia": NewGangliaSink,
"stdout": new(StdoutSink), "libganglia": NewLibgangliaSink,
"nats": new(NatsSink), "stdout": NewStdoutSink,
"http": new(HttpSink), "nats": NewNatsSink,
"ganglia": new(GangliaSink), "influxdb": NewInfluxSink,
"influxasync": new(InfluxAsyncSink), "influxasync": NewInfluxAsyncSink,
"libganglia": new(LibgangliaSink), "http": NewHttpSink,
} }
// Metric collector manager data structure // Metric collector manager data structure
@ -149,8 +149,7 @@ func (sm *sinkManager) AddOutput(name string, rawConfig json.RawMessage) error {
cclog.ComponentError("SinkManager", "SKIP", name, "unknown sink:", sinkConfig.Type) cclog.ComponentError("SinkManager", "SKIP", name, "unknown sink:", sinkConfig.Type)
return err return err
} }
s := AvailableSinks[sinkConfig.Type] s, err := AvailableSinks[sinkConfig.Type](name, rawConfig)
err = s.Init(rawConfig)
if err != nil { if err != nil {
cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error()) cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error())
return err return err

View File

@ -19,34 +19,6 @@ type StdoutSink struct {
} }
} }
func (s *StdoutSink) Init(config json.RawMessage) error {
s.name = "StdoutSink"
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return err
}
}
s.output = os.Stdout
if len(s.config.Output) > 0 {
switch strings.ToLower(s.config.Output) {
case "stdout":
s.output = os.Stdout
case "stderr":
s.output = os.Stderr
default:
f, err := os.OpenFile(s.config.Output, os.O_CREATE|os.O_WRONLY, os.FileMode(0600))
if err != nil {
return err
}
s.output = f
}
}
s.meta_as_tags = s.config.MetaAsTags
return nil
}
func (s *StdoutSink) Write(m lp.CCMetric) error { func (s *StdoutSink) Write(m lp.CCMetric) error {
fmt.Fprint( fmt.Fprint(
s.output, s.output,
@ -65,3 +37,33 @@ func (s *StdoutSink) Close() {
s.output.Close() s.output.Close()
} }
} }
func NewStdoutSink(name string, config json.RawMessage) (Sink, error) {
s := new(StdoutSink)
s.name = fmt.Sprintf("StdoutSink(%s)", name)
if len(config) > 0 {
err := json.Unmarshal(config, &s.config)
if err != nil {
return nil, err
}
}
s.output = os.Stdout
if len(s.config.Output) > 0 {
switch strings.ToLower(s.config.Output) {
case "stdout":
s.output = os.Stdout
case "stderr":
s.output = os.Stderr
default:
f, err := os.OpenFile(s.config.Output, os.O_CREATE|os.O_WRONLY, os.FileMode(0600))
if err != nil {
return nil, err
}
s.output = f
}
}
s.meta_as_tags = s.config.MetaAsTags
return s, nil
}