From 18a226183c92cf10c59be029d1667761ba592f23 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Tue, 22 Feb 2022 16:15:25 +0100 Subject: [PATCH 1/6] Use new sink instances to allow multiple of same sink type --- sinks/gangliaSink.go | 10 +++++++--- sinks/httpSink.go | 10 ++++++++-- sinks/influxAsyncSink.go | 10 ++++++++-- sinks/influxSink.go | 10 ++++++++-- sinks/libgangliaSink.go | 10 ++++++++-- sinks/metricSink.go | 2 +- sinks/natsSink.go | 10 ++++++++-- sinks/sinkManager.go | 19 +++++++++---------- sinks/stdoutSink.go | 10 ++++++++-- 9 files changed, 65 insertions(+), 26 deletions(-) diff --git a/sinks/gangliaSink.go b/sinks/gangliaSink.go index ae53dd8..0c9459b 100644 --- a/sinks/gangliaSink.go +++ b/sinks/gangliaSink.go @@ -33,9 +33,9 @@ type GangliaSink struct { config GangliaSinkConfig } -func (s *GangliaSink) Init(config json.RawMessage) error { +func (s *GangliaSink) Init(name string, config json.RawMessage) error { var err error = nil - s.name = "GangliaSink" + s.name = fmt.Sprintf("GangliaSink(%s)", name) s.config.AddTagsAsDesc = false s.config.AddGangliaGroup = false if len(config) > 0 { @@ -168,4 +168,8 @@ func (s *GangliaSink) Flush() error { func (s *GangliaSink) Close() { } -func NewGangliaSink() +func NewGangliaSink(name string, config json.RawMessage) (Sink, error) { + s := new(GangliaSink) + s.Init(name, config) + return s, nil +} diff --git a/sinks/httpSink.go b/sinks/httpSink.go index ce46bab..41a5919 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -38,9 +38,9 @@ type HttpSink struct { flushDelay time.Duration } -func (s *HttpSink) Init(config json.RawMessage) error { +func (s *HttpSink) Init(name string, config json.RawMessage) error { // Set default values - s.name = "HttpSink" + s.name = fmt.Sprintf("HttpSink(%s)", name) s.config.MaxIdleConns = 10 s.config.IdleConnTimeout = "5s" s.config.Timeout = "5s" @@ -169,3 +169,9 @@ func (s *HttpSink) Close() { } s.client.CloseIdleConnections() } + +func NewHttpSink(name string, config json.RawMessage) (Sink, error) { + s := new(HttpSink) + s.Init(name, config) + return s, nil +} diff --git a/sinks/influxAsyncSink.go b/sinks/influxAsyncSink.go index 20aa60c..e2f8995 100644 --- a/sinks/influxAsyncSink.go +++ b/sinks/influxAsyncSink.go @@ -68,8 +68,8 @@ func (s *InfluxAsyncSink) connect() error { return nil } -func (s *InfluxAsyncSink) Init(config json.RawMessage) error { - s.name = "InfluxSink" +func (s *InfluxAsyncSink) Init(name string, config json.RawMessage) error { + s.name = fmt.Sprintf("InfluxSink(%s)", name) // Set default for maximum number of points sent to server in single request. s.config.BatchSize = 100 @@ -118,3 +118,9 @@ func (s *InfluxAsyncSink) Close() { s.writeApi.Flush() s.client.Close() } + +func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { + s := new(InfluxAsyncSink) + s.Init(name, config) + return s, nil +} diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 99304c0..f235054 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -57,8 +57,8 @@ func (s *InfluxSink) connect() error { return nil } -func (s *InfluxSink) Init(config json.RawMessage) error { - s.name = "InfluxSink" +func (s *InfluxSink) Init(name string, config json.RawMessage) error { + s.name = fmt.Sprintf("InfluxSink(%s)", name) if len(config) > 0 { err := json.Unmarshal(config, &s.config) if err != nil { @@ -94,3 +94,9 @@ func (s *InfluxSink) Close() { cclog.ComponentDebug(s.name, "Closing InfluxDB connection") s.client.Close() } + +func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { + s := new(InfluxSink) + s.Init(name, config) + return s, nil +} diff --git a/sinks/libgangliaSink.go b/sinks/libgangliaSink.go index 7136e42..9a45df4 100644 --- a/sinks/libgangliaSink.go +++ b/sinks/libgangliaSink.go @@ -109,9 +109,9 @@ type LibgangliaSink struct { cstrCache map[string]*C.char } -func (s *LibgangliaSink) Init(config json.RawMessage) error { +func (s *LibgangliaSink) Init(name string, config json.RawMessage) error { var err error = nil - s.name = "LibgangliaSink" + s.name = fmt.Sprintf("LibgangliaSink(%s)", name) //s.config.AddTagsAsDesc = false s.config.AddGangliaGroup = false s.config.AddTypeToName = false @@ -316,3 +316,9 @@ func (s *LibgangliaSink) Close() { C.free(unsafe.Pointer(cstr)) } } + +func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) { + s := new(LibgangliaSink) + s.Init(name, config) + return s, nil +} diff --git a/sinks/metricSink.go b/sinks/metricSink.go index d76f5f2..4583a2c 100644 --- a/sinks/metricSink.go +++ b/sinks/metricSink.go @@ -17,7 +17,7 @@ type sink struct { } type Sink interface { - Init(config json.RawMessage) error + Init(name string, config json.RawMessage) error Write(point lp.CCMetric) error Flush() error Close() diff --git a/sinks/natsSink.go b/sinks/natsSink.go index 187157e..6087da0 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -53,8 +53,8 @@ func (s *NatsSink) connect() error { return nil } -func (s *NatsSink) Init(config json.RawMessage) error { - s.name = "NatsSink" +func (s *NatsSink) Init(name string, config json.RawMessage) error { + s.name = fmt.Sprintf("NatsSink(%s)", name) if len(config) > 0 { err := json.Unmarshal(config, &s.config) if err != nil { @@ -105,3 +105,9 @@ func (s *NatsSink) Close() { s.client.Close() } } + +func NewNatsSink(name string, config json.RawMessage) (Sink, error) { + s := new(NatsSink) + s.Init(name, config) + return s, nil +} diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index 487e7ca..f531f5d 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -13,14 +13,14 @@ import ( const SINK_MAX_FORWARD = 50 // Map of all available sinks -var AvailableSinks = map[string]Sink{ - "influxdb": new(InfluxSink), - "stdout": new(StdoutSink), - "nats": new(NatsSink), - "http": new(HttpSink), - "ganglia": new(GangliaSink), - "influxasync": new(InfluxAsyncSink), - "libganglia": new(LibgangliaSink), +var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink, error){ + "ganglia": NewGangliaSink, + "libganglia": NewLibgangliaSink, + "stdout": NewStdoutSink, + "nats": NewNatsSink, + "influxdb": NewInfluxSink, + "influxasync": NewInfluxAsyncSink, + "http": NewHttpSink, } // Metric collector manager data structure @@ -149,8 +149,7 @@ func (sm *sinkManager) AddOutput(name string, rawConfig json.RawMessage) error { cclog.ComponentError("SinkManager", "SKIP", name, "unknown sink:", sinkConfig.Type) return err } - s := AvailableSinks[sinkConfig.Type] - err = s.Init(rawConfig) + s, err := AvailableSinks[sinkConfig.Type](name, rawConfig) if err != nil { cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error()) return err diff --git a/sinks/stdoutSink.go b/sinks/stdoutSink.go index 5d0761a..d6c2e1b 100644 --- a/sinks/stdoutSink.go +++ b/sinks/stdoutSink.go @@ -19,8 +19,8 @@ type StdoutSink struct { } } -func (s *StdoutSink) Init(config json.RawMessage) error { - s.name = "StdoutSink" +func (s *StdoutSink) Init(name string, config json.RawMessage) error { + s.name = fmt.Sprintf("StdoutSink(%s)", name) if len(config) > 0 { err := json.Unmarshal(config, &s.config) if err != nil { @@ -65,3 +65,9 @@ func (s *StdoutSink) Close() { s.output.Close() } } + +func NewStdoutSink(name string, config json.RawMessage) (Sink, error) { + s := new(StdoutSink) + s.Init(name, config) + return s, nil +} From 24e12ccc57eeb71dc9aafb43cb21dec935eb0fdc Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Tue, 22 Feb 2022 16:19:46 +0100 Subject: [PATCH 2/6] Update sink README and SampleSink --- sinks/README.md | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/sinks/README.md b/sinks/README.md index 8ff3743..65d2851 100644 --- a/sinks/README.md +++ b/sinks/README.md @@ -6,6 +6,7 @@ This folder contains the SinkManager and sink implementations for the cc-metric- - [`stdout`](./stdoutSink.md): Print all metrics to `stdout`, `stderr` or a file - [`http`](./httpSink.md): Send metrics to an HTTP server as POST requests - [`influxdb`](./influxSink.md): Send metrics to an [InfluxDB](https://www.influxdata.com/products/influxdb/) database +- [`influxasync`](./influxAsyncSink.md): Send metrics to an [InfluxDB](https://www.influxdata.com/products/influxdb/) database with non-blocking write API - [`nats`](./natsSink.md): Publish metrics to the [NATS](https://nats.io/) network overlay system - [`ganglia`](./gangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) using the `gmetric` CLI tool - [`libganglia`](./libgangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) directly using `libganglia.so` @@ -34,11 +35,12 @@ The configuration file for the sinks is a list of configurations. The `type` fie # Contributing own sinks -A sink contains four functions and is derived from the type `sink`: -* `Init(config json.RawMessage) error` +A sink contains five functions and is derived from the type `sink`: +* `Init(name string, config json.RawMessage) error` * `Write(point CCMetric) error` * `Flush() error` * `Close()` +* `New(name string, config json.RawMessage) (Sink, error)` (calls the `Init()` function) The data structures should be set up in `Init()` like opening a file or server connection. The `Write()` function writes/sends the data. For non-blocking sinks, the `Flush()` method tells the sink to drain its internal buffers. The `Close()` function should tear down anything created in `Init()`. @@ -65,8 +67,8 @@ type SampleSink struct { } // Initialize the sink by giving it a name and reading in the config JSON -func (s *SampleSink) Init(config json.RawMessage) error { - s.name = "SampleSink" // Always specify a name here +func (s *SampleSink) Init(name string, config json.RawMessage) error { + s.name = fmt.Sprintf("SampleSink(%s)", name) // Always specify a name here // Read in the config JSON if len(config) > 0 { err := json.Unmarshal(config, &s.config) @@ -91,4 +93,13 @@ func (s *SampleSink) Flush() error { // Close sink: close network connection, close files, close libraries, ... func (s *SampleSink) Close() {} + + +// New function to create a new instance of the sink +func NewSampleSink(name string, config json.RawMessage) (Sink, error) { + s := new(SampleSink) + err := s.Init(name, config) + return s, err +} + ``` \ No newline at end of file From 3598aed0909b033d728d14c55e424a1c5f7295ee Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Tue, 22 Feb 2022 16:33:38 +0100 Subject: [PATCH 3/6] Use new receiver instances to allow multiple of same receiver type --- receivers/README.md | 22 ++++++++++------------ receivers/metricReceiver.go | 5 ++--- receivers/natsReceiver.go | 9 +++++++-- receivers/receiveManager.go | 7 +++---- 4 files changed, 22 insertions(+), 21 deletions(-) diff --git a/receivers/README.md b/receivers/README.md index 24425f2..49015d3 100644 --- a/receivers/README.md +++ b/receivers/README.md @@ -7,14 +7,11 @@ This folder contains the ReceiveManager and receiver implementations for the cc- The configuration file for the receivers is a list of configurations. The `type` field in each specifies which receiver to initialize. ```json -[ - { - "type": "nats", - "address": "nats://my-url", - "port" : "4222", - "database": "testcluster" +{ + "myreceivername" : { + } -] +} ``` @@ -25,20 +22,21 @@ The configuration file for the receivers is a list of configurations. The `type` "type": "nats", "address": "", "port" : "", - "database": "" + "subject": "" } ``` The `nats` receiver subscribes to the topic `database` and listens on `address` and `port` for metrics in the InfluxDB line protocol. # Contributing own receivers -A receiver contains three functions and is derived from the type `Receiver` (in `metricReceiver.go`): -* `Init(config ReceiverConfig) error` +A receiver contains a few functions and is derived from the type `Receiver` (in `metricReceiver.go`): +* `Init(name string, config json.RawMessage) error` * `Start() error` * `Close()` * `Name() string` -* `SetSink(sink chan ccMetric.CCMetric)` +* `SetSink(sink chan lp.CCMetric)` +* `New(name string, config json.RawMessage)` The data structures should be set up in `Init()` like opening a file or server connection. The `Start()` function should either start a go routine or issue some other asynchronous mechanism for receiving metrics. The `Close()` function should tear down anything created in `Init()`. -Finally, the receiver needs to be registered in the `receiveManager.go`. There is a list of receivers called `AvailableReceivers` which is a map (`receiver_type_string` -> `pointer to Receiver interface`). Add a new entry with a descriptive name and the new receiver. +Finally, the receiver needs to be registered in the `receiveManager.go`. There is a list of receivers called `AvailableReceivers` which is a map (`receiver_type_string` -> `pointer to NewReceiver function`). Add a new entry with a descriptive name and the new receiver. diff --git a/receivers/metricReceiver.go b/receivers/metricReceiver.go index c712186..e1a384a 100644 --- a/receivers/metricReceiver.go +++ b/receivers/metricReceiver.go @@ -20,9 +20,8 @@ type ReceiverConfig struct { } type receiver struct { - typename string - name string - sink chan lp.CCMetric + name string + sink chan lp.CCMetric } type Receiver interface { diff --git a/receivers/natsReceiver.go b/receivers/natsReceiver.go index dc96971..6114ecd 100644 --- a/receivers/natsReceiver.go +++ b/receivers/natsReceiver.go @@ -33,8 +33,7 @@ var DefaultTime = func() time.Time { } func (r *NatsReceiver) Init(name string, config json.RawMessage) error { - r.typename = "NatsReceiver" - r.name = name + r.name = fmt.Sprintf("NatsReceiver(%s)", name) r.config.Addr = nats.DefaultURL r.config.Port = "4222" if len(config) > 0 { @@ -91,3 +90,9 @@ func (r *NatsReceiver) Close() { r.nc.Close() } } + +func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) { + r := new(NatsReceiver) + err := r.Init(name, config) + return r, err +} diff --git a/receivers/receiveManager.go b/receivers/receiveManager.go index 7141170..7b1a8fe 100644 --- a/receivers/receiveManager.go +++ b/receivers/receiveManager.go @@ -9,8 +9,8 @@ import ( lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) -var AvailableReceivers = map[string]Receiver{ - "nats": &NatsReceiver{}, +var AvailableReceivers = map[string]func(name string, config json.RawMessage) (Receiver, error){ + "nats": NewNatsReceiver, } type receiveManager struct { @@ -75,8 +75,7 @@ func (rm *receiveManager) AddInput(name string, rawConfig json.RawMessage) error cclog.ComponentError("ReceiveManager", "SKIP", config.Type, "unknown receiver:", err.Error()) return err } - r := AvailableReceivers[config.Type] - err = r.Init(name, rawConfig) + r, err := AvailableReceivers[config.Type](name, rawConfig) if err != nil { cclog.ComponentError("ReceiveManager", "SKIP", r.Name(), "initialization failed:", err.Error()) return err From 73981527d3392408a3be52194e8dadd252e9406d Mon Sep 17 00:00:00 2001 From: Holger Obermaier Date: Wed, 23 Feb 2022 14:56:29 +0100 Subject: [PATCH 4/6] Refactor: Embed Init() into New() function --- sinks/gangliaSink.go | 66 ++++++++++------------ sinks/httpSink.go | 98 ++++++++++++++++----------------- sinks/influxAsyncSink.go | 75 ++++++++++++------------- sinks/influxSink.go | 40 +++++++------- sinks/libgangliaSink.go | 115 +++++++++++++++++++-------------------- sinks/metricSink.go | 3 - sinks/natsSink.go | 48 ++++++++-------- sinks/stdoutSink.go | 54 +++++++++--------- 8 files changed, 234 insertions(+), 265 deletions(-) diff --git a/sinks/gangliaSink.go b/sinks/gangliaSink.go index 0c9459b..8431011 100644 --- a/sinks/gangliaSink.go +++ b/sinks/gangliaSink.go @@ -33,41 +33,6 @@ type GangliaSink struct { config GangliaSinkConfig } -func (s *GangliaSink) Init(name string, config json.RawMessage) error { - var err error = nil - s.name = fmt.Sprintf("GangliaSink(%s)", name) - s.config.AddTagsAsDesc = false - s.config.AddGangliaGroup = false - if len(config) > 0 { - err := json.Unmarshal(config, &s.config) - if err != nil { - cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error()) - return err - } - } - s.gmetric_path = "" - s.gmetric_config = "" - if len(s.config.GmetricPath) > 0 { - p, err := exec.LookPath(s.config.GmetricPath) - if err == nil { - s.gmetric_path = p - } - } - if len(s.gmetric_path) == 0 { - p, err := exec.LookPath(string(GMETRIC_EXEC)) - if err == nil { - s.gmetric_path = p - } - } - if len(s.gmetric_path) == 0 { - err = errors.New("cannot find executable 'gmetric'") - } - if len(s.config.GmetricConfig) > 0 { - s.gmetric_config = s.config.GmetricConfig - } - return err -} - func (s *GangliaSink) Write(point lp.CCMetric) error { var err error = nil var tagsstr []string @@ -170,6 +135,35 @@ func (s *GangliaSink) Close() { func NewGangliaSink(name string, config json.RawMessage) (Sink, error) { s := new(GangliaSink) - s.Init(name, config) + s.name = fmt.Sprintf("GangliaSink(%s)", name) + s.config.AddTagsAsDesc = false + s.config.AddGangliaGroup = false + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error()) + return nil, err + } + } + s.gmetric_path = "" + s.gmetric_config = "" + if len(s.config.GmetricPath) > 0 { + p, err := exec.LookPath(s.config.GmetricPath) + if err == nil { + s.gmetric_path = p + } + } + if len(s.gmetric_path) == 0 { + p, err := exec.LookPath(string(GMETRIC_EXEC)) + if err == nil { + s.gmetric_path = p + } + } + if len(s.gmetric_path) == 0 { + return nil, errors.New("cannot find executable 'gmetric'") + } + if len(s.config.GmetricConfig) > 0 { + s.gmetric_config = s.config.GmetricConfig + } return s, nil } diff --git a/sinks/httpSink.go b/sinks/httpSink.go index 41a5919..c2dd2ea 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -38,57 +38,6 @@ type HttpSink struct { flushDelay time.Duration } -func (s *HttpSink) Init(name string, config json.RawMessage) error { - // Set default values - s.name = fmt.Sprintf("HttpSink(%s)", name) - s.config.MaxIdleConns = 10 - s.config.IdleConnTimeout = "5s" - s.config.Timeout = "5s" - s.config.FlushDelay = "1s" - - // Read config - if len(config) > 0 { - err := json.Unmarshal(config, &s.config) - if err != nil { - return err - } - } - if len(s.config.URL) == 0 { - return errors.New("`url` config option is required for HTTP sink") - } - if s.config.MaxIdleConns > 0 { - s.maxIdleConns = s.config.MaxIdleConns - } - if len(s.config.IdleConnTimeout) > 0 { - t, err := time.ParseDuration(s.config.IdleConnTimeout) - if err == nil { - s.idleConnTimeout = t - } - } - if len(s.config.Timeout) > 0 { - t, err := time.ParseDuration(s.config.Timeout) - if err == nil { - s.timeout = t - } - } - if len(s.config.FlushDelay) > 0 { - t, err := time.ParseDuration(s.config.FlushDelay) - if err == nil { - s.flushDelay = t - } - } - tr := &http.Transport{ - MaxIdleConns: s.maxIdleConns, - IdleConnTimeout: s.idleConnTimeout, - } - s.client = &http.Client{Transport: tr, Timeout: s.timeout} - s.buffer = &bytes.Buffer{} - s.encoder = influx.NewEncoder(s.buffer) - s.encoder.SetPrecision(time.Second) - - return nil -} - func (s *HttpSink) Write(m lp.CCMetric) error { if s.buffer.Len() == 0 && s.flushDelay != 0 { // This is the first write since the last flush, start the flushTimer! @@ -172,6 +121,51 @@ func (s *HttpSink) Close() { func NewHttpSink(name string, config json.RawMessage) (Sink, error) { s := new(HttpSink) - s.Init(name, config) + // Set default values + s.name = fmt.Sprintf("HttpSink(%s)", name) + s.config.MaxIdleConns = 10 + s.config.IdleConnTimeout = "5s" + s.config.Timeout = "5s" + s.config.FlushDelay = "1s" + + // Read config + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + return nil, err + } + } + if len(s.config.URL) == 0 { + return nil, errors.New("`url` config option is required for HTTP sink") + } + if s.config.MaxIdleConns > 0 { + s.maxIdleConns = s.config.MaxIdleConns + } + if len(s.config.IdleConnTimeout) > 0 { + t, err := time.ParseDuration(s.config.IdleConnTimeout) + if err == nil { + s.idleConnTimeout = t + } + } + if len(s.config.Timeout) > 0 { + t, err := time.ParseDuration(s.config.Timeout) + if err == nil { + s.timeout = t + } + } + if len(s.config.FlushDelay) > 0 { + t, err := time.ParseDuration(s.config.FlushDelay) + if err == nil { + s.flushDelay = t + } + } + tr := &http.Transport{ + MaxIdleConns: s.maxIdleConns, + IdleConnTimeout: s.idleConnTimeout, + } + s.client = &http.Client{Transport: tr, Timeout: s.timeout} + s.buffer = &bytes.Buffer{} + s.encoder = influx.NewEncoder(s.buffer) + s.encoder.SetPrecision(time.Second) return s, nil } diff --git a/sinks/influxAsyncSink.go b/sinks/influxAsyncSink.go index e2f8995..81b7f78 100644 --- a/sinks/influxAsyncSink.go +++ b/sinks/influxAsyncSink.go @@ -30,11 +30,10 @@ type InfluxAsyncSinkConfig struct { type InfluxAsyncSink struct { sink - client influxdb2.Client - writeApi influxdb2Api.WriteAPI - retPolicy string - errors <-chan error - config InfluxAsyncSinkConfig + client influxdb2.Client + writeApi influxdb2Api.WriteAPI + errors <-chan error + config InfluxAsyncSinkConfig } func (s *InfluxAsyncSink) connect() error { @@ -68,39 +67,6 @@ func (s *InfluxAsyncSink) connect() error { return nil } -func (s *InfluxAsyncSink) Init(name string, config json.RawMessage) error { - s.name = fmt.Sprintf("InfluxSink(%s)", name) - - // Set default for maximum number of points sent to server in single request. - s.config.BatchSize = 100 - - if len(config) > 0 { - err := json.Unmarshal(config, &s.config) - if err != nil { - return err - } - } - if len(s.config.Host) == 0 || - len(s.config.Port) == 0 || - len(s.config.Database) == 0 || - len(s.config.Organization) == 0 || - len(s.config.Password) == 0 { - return errors.New("not all configuration variables set required by InfluxAsyncSink") - } - - // Connect to InfluxDB server - err := s.connect() - - // Start background: Read from error channel - s.errors = s.writeApi.Errors() - go func() { - for err := range s.errors { - cclog.ComponentError(s.name, err.Error()) - } - }() - return err -} - func (s *InfluxAsyncSink) Write(m lp.CCMetric) error { s.writeApi.WritePoint( m.ToPoint(s.config.MetaAsTags), @@ -121,6 +87,37 @@ func (s *InfluxAsyncSink) Close() { func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { s := new(InfluxAsyncSink) - s.Init(name, config) + s.name = fmt.Sprintf("InfluxSink(%s)", name) + + // Set default for maximum number of points sent to server in single request. + s.config.BatchSize = 100 + + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + return nil, err + } + } + if len(s.config.Host) == 0 || + len(s.config.Port) == 0 || + len(s.config.Database) == 0 || + len(s.config.Organization) == 0 || + len(s.config.Password) == 0 { + return nil, errors.New("not all configuration variables set required by InfluxAsyncSink") + } + + // Connect to InfluxDB server + if err := s.connect(); err != nil { + return nil, fmt.Errorf("Unable to connect: %v", err) + } + + // Start background: Read from error channel + s.errors = s.writeApi.Errors() + go func() { + for err := range s.errors { + cclog.ComponentError(s.name, err.Error()) + } + }() + return s, nil } diff --git a/sinks/influxSink.go b/sinks/influxSink.go index f235054..d156585 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -57,26 +57,6 @@ func (s *InfluxSink) connect() error { return nil } -func (s *InfluxSink) Init(name string, config json.RawMessage) error { - s.name = fmt.Sprintf("InfluxSink(%s)", name) - if len(config) > 0 { - err := json.Unmarshal(config, &s.config) - if err != nil { - return err - } - } - if len(s.config.Host) == 0 || - len(s.config.Port) == 0 || - len(s.config.Database) == 0 || - len(s.config.Organization) == 0 || - len(s.config.Password) == 0 { - return errors.New("not all configuration variables set required by InfluxSink") - } - - // Connect to InfluxDB server - return s.connect() -} - func (s *InfluxSink) Write(m lp.CCMetric) error { err := s.writeApi.WritePoint( @@ -97,6 +77,24 @@ func (s *InfluxSink) Close() { func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { s := new(InfluxSink) - s.Init(name, config) + s.name = fmt.Sprintf("InfluxSink(%s)", name) + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + return nil, err + } + } + if len(s.config.Host) == 0 || + len(s.config.Port) == 0 || + len(s.config.Database) == 0 || + len(s.config.Organization) == 0 || + len(s.config.Password) == 0 { + return nil, errors.New("not all configuration variables set required by InfluxSink") + } + + // Connect to InfluxDB server + if err := s.connect(); err != nil { + return nil, fmt.Errorf("Unable to connect: %v", err) + } return s, nil } diff --git a/sinks/libgangliaSink.go b/sinks/libgangliaSink.go index 9a45df4..5fd1eb8 100644 --- a/sinks/libgangliaSink.go +++ b/sinks/libgangliaSink.go @@ -109,65 +109,6 @@ type LibgangliaSink struct { cstrCache map[string]*C.char } -func (s *LibgangliaSink) Init(name string, config json.RawMessage) error { - var err error = nil - s.name = fmt.Sprintf("LibgangliaSink(%s)", name) - //s.config.AddTagsAsDesc = false - s.config.AddGangliaGroup = false - s.config.AddTypeToName = false - s.config.AddUnits = true - s.config.GmondConfig = string(GMOND_CONFIG_FILE) - s.config.GangliaLib = string(GANGLIA_LIB_NAME) - if len(config) > 0 { - err = json.Unmarshal(config, &s.config) - if err != nil { - cclog.ComponentError(s.name, "Error reading config:", err.Error()) - return err - } - } - lib := dl.New(s.config.GangliaLib, GANGLIA_LIB_DL_FLAGS) - if lib == nil { - return fmt.Errorf("error instantiating DynamicLibrary for %s", s.config.GangliaLib) - } - err = lib.Open() - if err != nil { - return fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err) - } - - // Set up cache for the C strings - s.cstrCache = make(map[string]*C.char) - // s.cstrCache["globals"] = C.CString("globals") - - // s.cstrCache["override_hostname"] = C.CString("override_hostname") - // s.cstrCache["override_ip"] = C.CString("override_ip") - - // Add some constant strings - s.cstrCache["GROUP"] = C.CString("GROUP") - s.cstrCache["CLUSTER"] = C.CString("CLUSTER") - s.cstrCache[""] = C.CString("") - - // Add cluster name for lookup in Write() - if len(s.config.ClusterName) > 0 { - s.cstrCache[s.config.ClusterName] = C.CString(s.config.ClusterName) - } - // Add supported types for later lookup in Write() - s.cstrCache["double"] = C.CString("double") - s.cstrCache["int32"] = C.CString("int32") - s.cstrCache["string"] = C.CString("string") - - // Create Ganglia pool - s.global_context = C.Ganglia_pool_create(nil) - // Load Ganglia configuration - s.cstrCache[s.config.GmondConfig] = C.CString(s.config.GmondConfig) - s.gmond_config = C.Ganglia_gmond_config_create(s.cstrCache[s.config.GmondConfig], 0) - //globals := C.cfg_getsec(gmond_config, s.cstrCache["globals"]) - //override_hostname := C.cfg_getstr(globals, s.cstrCache["override_hostname"]) - //override_ip := C.cfg_getstr(globals, s.cstrCache["override_ip"]) - - s.send_channels = C.Ganglia_udp_send_channels_create(s.global_context, s.gmond_config) - return nil -} - func (s *LibgangliaSink) Write(point lp.CCMetric) error { var err error = nil var c_name *C.char @@ -319,6 +260,60 @@ func (s *LibgangliaSink) Close() { func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) { s := new(LibgangliaSink) - s.Init(name, config) + var err error = nil + s.name = fmt.Sprintf("LibgangliaSink(%s)", name) + //s.config.AddTagsAsDesc = false + s.config.AddGangliaGroup = false + s.config.AddTypeToName = false + s.config.AddUnits = true + s.config.GmondConfig = string(GMOND_CONFIG_FILE) + s.config.GangliaLib = string(GANGLIA_LIB_NAME) + if len(config) > 0 { + err = json.Unmarshal(config, &s.config) + if err != nil { + cclog.ComponentError(s.name, "Error reading config:", err.Error()) + return nil, err + } + } + lib := dl.New(s.config.GangliaLib, GANGLIA_LIB_DL_FLAGS) + if lib == nil { + return nil, fmt.Errorf("error instantiating DynamicLibrary for %s", s.config.GangliaLib) + } + err = lib.Open() + if err != nil { + return nil, fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err) + } + + // Set up cache for the C strings + s.cstrCache = make(map[string]*C.char) + // s.cstrCache["globals"] = C.CString("globals") + + // s.cstrCache["override_hostname"] = C.CString("override_hostname") + // s.cstrCache["override_ip"] = C.CString("override_ip") + + // Add some constant strings + s.cstrCache["GROUP"] = C.CString("GROUP") + s.cstrCache["CLUSTER"] = C.CString("CLUSTER") + s.cstrCache[""] = C.CString("") + + // Add cluster name for lookup in Write() + if len(s.config.ClusterName) > 0 { + s.cstrCache[s.config.ClusterName] = C.CString(s.config.ClusterName) + } + // Add supported types for later lookup in Write() + s.cstrCache["double"] = C.CString("double") + s.cstrCache["int32"] = C.CString("int32") + s.cstrCache["string"] = C.CString("string") + + // Create Ganglia pool + s.global_context = C.Ganglia_pool_create(nil) + // Load Ganglia configuration + s.cstrCache[s.config.GmondConfig] = C.CString(s.config.GmondConfig) + s.gmond_config = C.Ganglia_gmond_config_create(s.cstrCache[s.config.GmondConfig], 0) + //globals := C.cfg_getsec(gmond_config, s.cstrCache["globals"]) + //override_hostname := C.cfg_getstr(globals, s.cstrCache["override_hostname"]) + //override_ip := C.cfg_getstr(globals, s.cstrCache["override_ip"]) + + s.send_channels = C.Ganglia_udp_send_channels_create(s.global_context, s.gmond_config) return s, nil } diff --git a/sinks/metricSink.go b/sinks/metricSink.go index 4583a2c..8fe02d7 100644 --- a/sinks/metricSink.go +++ b/sinks/metricSink.go @@ -1,8 +1,6 @@ package sinks import ( - "encoding/json" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) @@ -17,7 +15,6 @@ type sink struct { } type Sink interface { - Init(name string, config json.RawMessage) error Write(point lp.CCMetric) error Flush() error Close() diff --git a/sinks/natsSink.go b/sinks/natsSink.go index 6087da0..0d7987e 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -53,30 +53,6 @@ func (s *NatsSink) connect() error { return nil } -func (s *NatsSink) Init(name string, config json.RawMessage) error { - s.name = fmt.Sprintf("NatsSink(%s)", name) - if len(config) > 0 { - err := json.Unmarshal(config, &s.config) - if err != nil { - cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error()) - return err - } - } - if len(s.config.Host) == 0 || - len(s.config.Port) == 0 || - len(s.config.Database) == 0 { - return errors.New("not all configuration variables set required by NatsSink") - } - // Setup Influx line protocol - s.buffer = &bytes.Buffer{} - s.buffer.Grow(1025) - s.encoder = influx.NewEncoder(s.buffer) - s.encoder.SetPrecision(time.Second) - s.encoder.SetMaxLineBytes(1024) - // Setup infos for connection - return s.connect() -} - func (s *NatsSink) Write(m lp.CCMetric) error { if s.client != nil { _, err := s.encoder.Encode(m.ToPoint(s.config.MetaAsTags)) @@ -108,6 +84,28 @@ func (s *NatsSink) Close() { func NewNatsSink(name string, config json.RawMessage) (Sink, error) { s := new(NatsSink) - s.Init(name, config) + s.name = fmt.Sprintf("NatsSink(%s)", name) + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error()) + return nil, err + } + } + if len(s.config.Host) == 0 || + len(s.config.Port) == 0 || + len(s.config.Database) == 0 { + return nil, errors.New("not all configuration variables set required by NatsSink") + } + // Setup Influx line protocol + s.buffer = &bytes.Buffer{} + s.buffer.Grow(1025) + s.encoder = influx.NewEncoder(s.buffer) + s.encoder.SetPrecision(time.Second) + s.encoder.SetMaxLineBytes(1024) + // Setup infos for connection + if err := s.connect(); err != nil { + return nil, fmt.Errorf("Unable to connect: %v", err) + } return s, nil } diff --git a/sinks/stdoutSink.go b/sinks/stdoutSink.go index d6c2e1b..acf2621 100644 --- a/sinks/stdoutSink.go +++ b/sinks/stdoutSink.go @@ -19,34 +19,6 @@ type StdoutSink struct { } } -func (s *StdoutSink) Init(name string, config json.RawMessage) error { - s.name = fmt.Sprintf("StdoutSink(%s)", name) - if len(config) > 0 { - err := json.Unmarshal(config, &s.config) - if err != nil { - return err - } - } - - s.output = os.Stdout - if len(s.config.Output) > 0 { - switch strings.ToLower(s.config.Output) { - case "stdout": - s.output = os.Stdout - case "stderr": - s.output = os.Stderr - default: - f, err := os.OpenFile(s.config.Output, os.O_CREATE|os.O_WRONLY, os.FileMode(0600)) - if err != nil { - return err - } - s.output = f - } - } - s.meta_as_tags = s.config.MetaAsTags - return nil -} - func (s *StdoutSink) Write(m lp.CCMetric) error { fmt.Fprint( s.output, @@ -68,6 +40,30 @@ func (s *StdoutSink) Close() { func NewStdoutSink(name string, config json.RawMessage) (Sink, error) { s := new(StdoutSink) - s.Init(name, config) + s.name = fmt.Sprintf("StdoutSink(%s)", name) + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + return nil, err + } + } + + s.output = os.Stdout + if len(s.config.Output) > 0 { + switch strings.ToLower(s.config.Output) { + case "stdout": + s.output = os.Stdout + case "stderr": + s.output = os.Stderr + default: + f, err := os.OpenFile(s.config.Output, os.O_CREATE|os.O_WRONLY, os.FileMode(0600)) + if err != nil { + return nil, err + } + s.output = f + } + } + s.meta_as_tags = s.config.MetaAsTags + return s, nil } From 2f363754700a134a8d2e32a8b7e82933890ea267 Mon Sep 17 00:00:00 2001 From: Holger Obermaier Date: Wed, 23 Feb 2022 15:15:17 +0100 Subject: [PATCH 5/6] Refactor: Embed Init() into New() function --- receivers/README.md | 2 -- receivers/metricReceiver.go | 4 --- receivers/natsReceiver.go | 62 +++++++++++++++++-------------------- 3 files changed, 28 insertions(+), 40 deletions(-) diff --git a/receivers/README.md b/receivers/README.md index 49015d3..808dc74 100644 --- a/receivers/README.md +++ b/receivers/README.md @@ -14,7 +14,6 @@ The configuration file for the receivers is a list of configurations. The `type` } ``` - ## Type `nats` ```json @@ -30,7 +29,6 @@ The `nats` receiver subscribes to the topic `database` and listens on `address` # Contributing own receivers A receiver contains a few functions and is derived from the type `Receiver` (in `metricReceiver.go`): -* `Init(name string, config json.RawMessage) error` * `Start() error` * `Close()` * `Name() string` diff --git a/receivers/metricReceiver.go b/receivers/metricReceiver.go index e1a384a..e133354 100644 --- a/receivers/metricReceiver.go +++ b/receivers/metricReceiver.go @@ -1,9 +1,6 @@ package receivers import ( - // "time" - "encoding/json" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) @@ -25,7 +22,6 @@ type receiver struct { } type Receiver interface { - Init(name string, config json.RawMessage) error Start() Close() Name() string diff --git a/receivers/natsReceiver.go b/receivers/natsReceiver.go index 6114ecd..1a5f47b 100644 --- a/receivers/natsReceiver.go +++ b/receivers/natsReceiver.go @@ -32,38 +32,6 @@ var DefaultTime = func() time.Time { return time.Unix(42, 0) } -func (r *NatsReceiver) Init(name string, config json.RawMessage) error { - r.name = fmt.Sprintf("NatsReceiver(%s)", name) - r.config.Addr = nats.DefaultURL - r.config.Port = "4222" - if len(config) > 0 { - err := json.Unmarshal(config, &r.config) - if err != nil { - cclog.ComponentError(r.name, "Error reading config:", err.Error()) - return err - } - } - if len(r.config.Addr) == 0 || - len(r.config.Port) == 0 || - len(r.config.Subject) == 0 { - return errors.New("not all configuration variables set required by NatsReceiver") - } - r.meta = map[string]string{"source": r.name} - uri := fmt.Sprintf("%s:%s", r.config.Addr, r.config.Port) - cclog.ComponentDebug(r.name, "INIT", uri, "Subject", r.config.Subject) - nc, err := nats.Connect(uri) - if err == nil { - r.nc = nc - } else { - r.nc = nil - return err - } - r.handler = influx.NewMetricHandler() - r.parser = influx.NewParser(r.handler) - r.parser.SetTimeFunc(DefaultTime) - return err -} - func (r *NatsReceiver) Start() { cclog.ComponentDebug(r.name, "START") r.nc.Subscribe(r.config.Subject, r._NatsReceive) @@ -93,6 +61,32 @@ func (r *NatsReceiver) Close() { func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) { r := new(NatsReceiver) - err := r.Init(name, config) - return r, err + r.name = fmt.Sprintf("NatsReceiver(%s)", name) + r.config.Addr = nats.DefaultURL + r.config.Port = "4222" + if len(config) > 0 { + err := json.Unmarshal(config, &r.config) + if err != nil { + cclog.ComponentError(r.name, "Error reading config:", err.Error()) + return nil, err + } + } + if len(r.config.Addr) == 0 || + len(r.config.Port) == 0 || + len(r.config.Subject) == 0 { + return nil, errors.New("not all configuration variables set required by NatsReceiver") + } + r.meta = map[string]string{"source": r.name} + uri := fmt.Sprintf("%s:%s", r.config.Addr, r.config.Port) + cclog.ComponentDebug(r.name, "NewNatsReceiver", uri, "Subject", r.config.Subject) + if nc, err := nats.Connect(uri); err == nil { + r.nc = nc + } else { + r.nc = nil + return nil, err + } + r.handler = influx.NewMetricHandler() + r.parser = influx.NewParser(r.handler) + r.parser.SetTimeFunc(DefaultTime) + return r, nil } From 2f044f4b586129da8f93db8814c3362547ac9a8e Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Wed, 23 Feb 2022 15:58:51 +0100 Subject: [PATCH 6/6] Fix: MetricReceiver uses uninitialized values, when initialization fails --- receivers/receiveManager.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/receivers/receiveManager.go b/receivers/receiveManager.go index 7b1a8fe..1c13026 100644 --- a/receivers/receiveManager.go +++ b/receivers/receiveManager.go @@ -30,11 +30,13 @@ type ReceiveManager interface { } func (rm *receiveManager) Init(wg *sync.WaitGroup, receiverConfigFile string) error { + // Initialize struct fields rm.inputs = make([]Receiver, 0) rm.output = nil rm.done = make(chan bool) rm.wg = wg rm.config = make([]json.RawMessage, 0) + configFile, err := os.Open(receiverConfigFile) if err != nil { cclog.ComponentError("ReceiveManager", err.Error()) @@ -51,6 +53,7 @@ func (rm *receiveManager) Init(wg *sync.WaitGroup, receiverConfigFile string) er for name, raw := range rawConfigs { rm.AddInput(name, raw) } + return nil } @@ -77,7 +80,7 @@ func (rm *receiveManager) AddInput(name string, rawConfig json.RawMessage) error } r, err := AvailableReceivers[config.Type](name, rawConfig) if err != nil { - cclog.ComponentError("ReceiveManager", "SKIP", r.Name(), "initialization failed:", err.Error()) + cclog.ComponentError("ReceiveManager", "SKIP", name, "initialization failed:", err.Error()) return err } rm.inputs = append(rm.inputs, r)