Use new sink instances to allow multiple of same sink type

This commit is contained in:
Thomas Roehl 2022-02-22 16:15:25 +01:00
parent 9cfbe10247
commit 18a226183c
9 changed files with 65 additions and 26 deletions

View File

@ -33,9 +33,9 @@ type GangliaSink struct {
config GangliaSinkConfig config GangliaSinkConfig
} }
func (s *GangliaSink) Init(config json.RawMessage) error { func (s *GangliaSink) Init(name string, config json.RawMessage) error {
var err error = nil var err error = nil
s.name = "GangliaSink" s.name = fmt.Sprintf("GangliaSink(%s)", name)
s.config.AddTagsAsDesc = false s.config.AddTagsAsDesc = false
s.config.AddGangliaGroup = false s.config.AddGangliaGroup = false
if len(config) > 0 { if len(config) > 0 {
@ -168,4 +168,8 @@ func (s *GangliaSink) Flush() error {
func (s *GangliaSink) Close() { func (s *GangliaSink) Close() {
} }
func NewGangliaSink() func NewGangliaSink(name string, config json.RawMessage) (Sink, error) {
s := new(GangliaSink)
s.Init(name, config)
return s, nil
}

View File

@ -38,9 +38,9 @@ type HttpSink struct {
flushDelay time.Duration flushDelay time.Duration
} }
func (s *HttpSink) Init(config json.RawMessage) error { func (s *HttpSink) Init(name string, config json.RawMessage) error {
// Set default values // Set default values
s.name = "HttpSink" s.name = fmt.Sprintf("HttpSink(%s)", name)
s.config.MaxIdleConns = 10 s.config.MaxIdleConns = 10
s.config.IdleConnTimeout = "5s" s.config.IdleConnTimeout = "5s"
s.config.Timeout = "5s" s.config.Timeout = "5s"
@ -169,3 +169,9 @@ func (s *HttpSink) Close() {
} }
s.client.CloseIdleConnections() s.client.CloseIdleConnections()
} }
func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
s := new(HttpSink)
s.Init(name, config)
return s, nil
}

View File

@ -68,8 +68,8 @@ func (s *InfluxAsyncSink) connect() error {
return nil return nil
} }
func (s *InfluxAsyncSink) Init(config json.RawMessage) error { func (s *InfluxAsyncSink) Init(name string, config json.RawMessage) error {
s.name = "InfluxSink" s.name = fmt.Sprintf("InfluxSink(%s)", name)
// Set default for maximum number of points sent to server in single request. // Set default for maximum number of points sent to server in single request.
s.config.BatchSize = 100 s.config.BatchSize = 100
@ -118,3 +118,9 @@ func (s *InfluxAsyncSink) Close() {
s.writeApi.Flush() s.writeApi.Flush()
s.client.Close() s.client.Close()
} }
func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
s := new(InfluxAsyncSink)
s.Init(name, config)
return s, nil
}

View File

@ -57,8 +57,8 @@ func (s *InfluxSink) connect() error {
return nil return nil
} }
func (s *InfluxSink) Init(config json.RawMessage) error { func (s *InfluxSink) Init(name string, config json.RawMessage) error {
s.name = "InfluxSink" s.name = fmt.Sprintf("InfluxSink(%s)", name)
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &s.config) err := json.Unmarshal(config, &s.config)
if err != nil { if err != nil {
@ -94,3 +94,9 @@ func (s *InfluxSink) Close() {
cclog.ComponentDebug(s.name, "Closing InfluxDB connection") cclog.ComponentDebug(s.name, "Closing InfluxDB connection")
s.client.Close() s.client.Close()
} }
func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
s := new(InfluxSink)
s.Init(name, config)
return s, nil
}

View File

@ -109,9 +109,9 @@ type LibgangliaSink struct {
cstrCache map[string]*C.char cstrCache map[string]*C.char
} }
func (s *LibgangliaSink) Init(config json.RawMessage) error { func (s *LibgangliaSink) Init(name string, config json.RawMessage) error {
var err error = nil var err error = nil
s.name = "LibgangliaSink" s.name = fmt.Sprintf("LibgangliaSink(%s)", name)
//s.config.AddTagsAsDesc = false //s.config.AddTagsAsDesc = false
s.config.AddGangliaGroup = false s.config.AddGangliaGroup = false
s.config.AddTypeToName = false s.config.AddTypeToName = false
@ -316,3 +316,9 @@ func (s *LibgangliaSink) Close() {
C.free(unsafe.Pointer(cstr)) C.free(unsafe.Pointer(cstr))
} }
} }
func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) {
s := new(LibgangliaSink)
s.Init(name, config)
return s, nil
}

View File

@ -17,7 +17,7 @@ type sink struct {
} }
type Sink interface { type Sink interface {
Init(config json.RawMessage) error Init(name string, config json.RawMessage) error
Write(point lp.CCMetric) error Write(point lp.CCMetric) error
Flush() error Flush() error
Close() Close()

View File

@ -53,8 +53,8 @@ func (s *NatsSink) connect() error {
return nil return nil
} }
func (s *NatsSink) Init(config json.RawMessage) error { func (s *NatsSink) Init(name string, config json.RawMessage) error {
s.name = "NatsSink" s.name = fmt.Sprintf("NatsSink(%s)", name)
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &s.config) err := json.Unmarshal(config, &s.config)
if err != nil { if err != nil {
@ -105,3 +105,9 @@ func (s *NatsSink) Close() {
s.client.Close() s.client.Close()
} }
} }
func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
s := new(NatsSink)
s.Init(name, config)
return s, nil
}

View File

@ -13,14 +13,14 @@ import (
const SINK_MAX_FORWARD = 50 const SINK_MAX_FORWARD = 50
// Map of all available sinks // Map of all available sinks
var AvailableSinks = map[string]Sink{ var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink, error){
"influxdb": new(InfluxSink), "ganglia": NewGangliaSink,
"stdout": new(StdoutSink), "libganglia": NewLibgangliaSink,
"nats": new(NatsSink), "stdout": NewStdoutSink,
"http": new(HttpSink), "nats": NewNatsSink,
"ganglia": new(GangliaSink), "influxdb": NewInfluxSink,
"influxasync": new(InfluxAsyncSink), "influxasync": NewInfluxAsyncSink,
"libganglia": new(LibgangliaSink), "http": NewHttpSink,
} }
// Metric collector manager data structure // Metric collector manager data structure
@ -149,8 +149,7 @@ func (sm *sinkManager) AddOutput(name string, rawConfig json.RawMessage) error {
cclog.ComponentError("SinkManager", "SKIP", name, "unknown sink:", sinkConfig.Type) cclog.ComponentError("SinkManager", "SKIP", name, "unknown sink:", sinkConfig.Type)
return err return err
} }
s := AvailableSinks[sinkConfig.Type] s, err := AvailableSinks[sinkConfig.Type](name, rawConfig)
err = s.Init(rawConfig)
if err != nil { if err != nil {
cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error()) cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error())
return err return err

View File

@ -19,8 +19,8 @@ type StdoutSink struct {
} }
} }
func (s *StdoutSink) Init(config json.RawMessage) error { func (s *StdoutSink) Init(name string, config json.RawMessage) error {
s.name = "StdoutSink" s.name = fmt.Sprintf("StdoutSink(%s)", name)
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &s.config) err := json.Unmarshal(config, &s.config)
if err != nil { if err != nil {
@ -65,3 +65,9 @@ func (s *StdoutSink) Close() {
s.output.Close() s.output.Close()
} }
} }
func NewStdoutSink(name string, config json.RawMessage) (Sink, error) {
s := new(StdoutSink)
s.Init(name, config)
return s, nil
}