From 18a226183c92cf10c59be029d1667761ba592f23 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Tue, 22 Feb 2022 16:15:25 +0100 Subject: [PATCH] Use new sink instances to allow multiple of same sink type --- sinks/gangliaSink.go | 10 +++++++--- sinks/httpSink.go | 10 ++++++++-- sinks/influxAsyncSink.go | 10 ++++++++-- sinks/influxSink.go | 10 ++++++++-- sinks/libgangliaSink.go | 10 ++++++++-- sinks/metricSink.go | 2 +- sinks/natsSink.go | 10 ++++++++-- sinks/sinkManager.go | 19 +++++++++---------- sinks/stdoutSink.go | 10 ++++++++-- 9 files changed, 65 insertions(+), 26 deletions(-) diff --git a/sinks/gangliaSink.go b/sinks/gangliaSink.go index ae53dd8..0c9459b 100644 --- a/sinks/gangliaSink.go +++ b/sinks/gangliaSink.go @@ -33,9 +33,9 @@ type GangliaSink struct { config GangliaSinkConfig } -func (s *GangliaSink) Init(config json.RawMessage) error { +func (s *GangliaSink) Init(name string, config json.RawMessage) error { var err error = nil - s.name = "GangliaSink" + s.name = fmt.Sprintf("GangliaSink(%s)", name) s.config.AddTagsAsDesc = false s.config.AddGangliaGroup = false if len(config) > 0 { @@ -168,4 +168,8 @@ func (s *GangliaSink) Flush() error { func (s *GangliaSink) Close() { } -func NewGangliaSink() +func NewGangliaSink(name string, config json.RawMessage) (Sink, error) { + s := new(GangliaSink) + s.Init(name, config) + return s, nil +} diff --git a/sinks/httpSink.go b/sinks/httpSink.go index ce46bab..41a5919 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -38,9 +38,9 @@ type HttpSink struct { flushDelay time.Duration } -func (s *HttpSink) Init(config json.RawMessage) error { +func (s *HttpSink) Init(name string, config json.RawMessage) error { // Set default values - s.name = "HttpSink" + s.name = fmt.Sprintf("HttpSink(%s)", name) s.config.MaxIdleConns = 10 s.config.IdleConnTimeout = "5s" s.config.Timeout = "5s" @@ -169,3 +169,9 @@ func (s *HttpSink) Close() { } s.client.CloseIdleConnections() } + +func NewHttpSink(name string, config json.RawMessage) (Sink, error) { + s := new(HttpSink) + s.Init(name, config) + return s, nil +} diff --git a/sinks/influxAsyncSink.go b/sinks/influxAsyncSink.go index 20aa60c..e2f8995 100644 --- a/sinks/influxAsyncSink.go +++ b/sinks/influxAsyncSink.go @@ -68,8 +68,8 @@ func (s *InfluxAsyncSink) connect() error { return nil } -func (s *InfluxAsyncSink) Init(config json.RawMessage) error { - s.name = "InfluxSink" +func (s *InfluxAsyncSink) Init(name string, config json.RawMessage) error { + s.name = fmt.Sprintf("InfluxSink(%s)", name) // Set default for maximum number of points sent to server in single request. s.config.BatchSize = 100 @@ -118,3 +118,9 @@ func (s *InfluxAsyncSink) Close() { s.writeApi.Flush() s.client.Close() } + +func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { + s := new(InfluxAsyncSink) + s.Init(name, config) + return s, nil +} diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 99304c0..f235054 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -57,8 +57,8 @@ func (s *InfluxSink) connect() error { return nil } -func (s *InfluxSink) Init(config json.RawMessage) error { - s.name = "InfluxSink" +func (s *InfluxSink) Init(name string, config json.RawMessage) error { + s.name = fmt.Sprintf("InfluxSink(%s)", name) if len(config) > 0 { err := json.Unmarshal(config, &s.config) if err != nil { @@ -94,3 +94,9 @@ func (s *InfluxSink) Close() { cclog.ComponentDebug(s.name, "Closing InfluxDB connection") s.client.Close() } + +func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { + s := new(InfluxSink) + s.Init(name, config) + return s, nil +} diff --git a/sinks/libgangliaSink.go b/sinks/libgangliaSink.go index 7136e42..9a45df4 100644 --- a/sinks/libgangliaSink.go +++ b/sinks/libgangliaSink.go @@ -109,9 +109,9 @@ type LibgangliaSink struct { cstrCache map[string]*C.char } -func (s *LibgangliaSink) Init(config json.RawMessage) error { +func (s *LibgangliaSink) Init(name string, config json.RawMessage) error { var err error = nil - s.name = "LibgangliaSink" + s.name = fmt.Sprintf("LibgangliaSink(%s)", name) //s.config.AddTagsAsDesc = false s.config.AddGangliaGroup = false s.config.AddTypeToName = false @@ -316,3 +316,9 @@ func (s *LibgangliaSink) Close() { C.free(unsafe.Pointer(cstr)) } } + +func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) { + s := new(LibgangliaSink) + s.Init(name, config) + return s, nil +} diff --git a/sinks/metricSink.go b/sinks/metricSink.go index d76f5f2..4583a2c 100644 --- a/sinks/metricSink.go +++ b/sinks/metricSink.go @@ -17,7 +17,7 @@ type sink struct { } type Sink interface { - Init(config json.RawMessage) error + Init(name string, config json.RawMessage) error Write(point lp.CCMetric) error Flush() error Close() diff --git a/sinks/natsSink.go b/sinks/natsSink.go index 187157e..6087da0 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -53,8 +53,8 @@ func (s *NatsSink) connect() error { return nil } -func (s *NatsSink) Init(config json.RawMessage) error { - s.name = "NatsSink" +func (s *NatsSink) Init(name string, config json.RawMessage) error { + s.name = fmt.Sprintf("NatsSink(%s)", name) if len(config) > 0 { err := json.Unmarshal(config, &s.config) if err != nil { @@ -105,3 +105,9 @@ func (s *NatsSink) Close() { s.client.Close() } } + +func NewNatsSink(name string, config json.RawMessage) (Sink, error) { + s := new(NatsSink) + s.Init(name, config) + return s, nil +} diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index 487e7ca..f531f5d 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -13,14 +13,14 @@ import ( const SINK_MAX_FORWARD = 50 // Map of all available sinks -var AvailableSinks = map[string]Sink{ - "influxdb": new(InfluxSink), - "stdout": new(StdoutSink), - "nats": new(NatsSink), - "http": new(HttpSink), - "ganglia": new(GangliaSink), - "influxasync": new(InfluxAsyncSink), - "libganglia": new(LibgangliaSink), +var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink, error){ + "ganglia": NewGangliaSink, + "libganglia": NewLibgangliaSink, + "stdout": NewStdoutSink, + "nats": NewNatsSink, + "influxdb": NewInfluxSink, + "influxasync": NewInfluxAsyncSink, + "http": NewHttpSink, } // Metric collector manager data structure @@ -149,8 +149,7 @@ func (sm *sinkManager) AddOutput(name string, rawConfig json.RawMessage) error { cclog.ComponentError("SinkManager", "SKIP", name, "unknown sink:", sinkConfig.Type) return err } - s := AvailableSinks[sinkConfig.Type] - err = s.Init(rawConfig) + s, err := AvailableSinks[sinkConfig.Type](name, rawConfig) if err != nil { cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error()) return err diff --git a/sinks/stdoutSink.go b/sinks/stdoutSink.go index 5d0761a..d6c2e1b 100644 --- a/sinks/stdoutSink.go +++ b/sinks/stdoutSink.go @@ -19,8 +19,8 @@ type StdoutSink struct { } } -func (s *StdoutSink) Init(config json.RawMessage) error { - s.name = "StdoutSink" +func (s *StdoutSink) Init(name string, config json.RawMessage) error { + s.name = fmt.Sprintf("StdoutSink(%s)", name) if len(config) > 0 { err := json.Unmarshal(config, &s.config) if err != nil { @@ -65,3 +65,9 @@ func (s *StdoutSink) Close() { s.output.Close() } } + +func NewStdoutSink(name string, config json.RawMessage) (Sink, error) { + s := new(StdoutSink) + s.Init(name, config) + return s, nil +}