From 3598aed0909b033d728d14c55e424a1c5f7295ee Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Tue, 22 Feb 2022 16:33:38 +0100 Subject: [PATCH 1/2] Use new receiver instances to allow multiple of same receiver type --- receivers/README.md | 22 ++++++++++------------ receivers/metricReceiver.go | 5 ++--- receivers/natsReceiver.go | 9 +++++++-- receivers/receiveManager.go | 7 +++---- 4 files changed, 22 insertions(+), 21 deletions(-) diff --git a/receivers/README.md b/receivers/README.md index 24425f2..49015d3 100644 --- a/receivers/README.md +++ b/receivers/README.md @@ -7,14 +7,11 @@ This folder contains the ReceiveManager and receiver implementations for the cc- The configuration file for the receivers is a list of configurations. The `type` field in each specifies which receiver to initialize. ```json -[ - { - "type": "nats", - "address": "nats://my-url", - "port" : "4222", - "database": "testcluster" +{ + "myreceivername" : { + } -] +} ``` @@ -25,20 +22,21 @@ The configuration file for the receivers is a list of configurations. The `type` "type": "nats", "address": "", "port" : "", - "database": "" + "subject": "" } ``` The `nats` receiver subscribes to the topic `database` and listens on `address` and `port` for metrics in the InfluxDB line protocol. # Contributing own receivers -A receiver contains three functions and is derived from the type `Receiver` (in `metricReceiver.go`): -* `Init(config ReceiverConfig) error` +A receiver contains a few functions and is derived from the type `Receiver` (in `metricReceiver.go`): +* `Init(name string, config json.RawMessage) error` * `Start() error` * `Close()` * `Name() string` -* `SetSink(sink chan ccMetric.CCMetric)` +* `SetSink(sink chan lp.CCMetric)` +* `New(name string, config json.RawMessage)` The data structures should be set up in `Init()` like opening a file or server connection. The `Start()` function should either start a go routine or issue some other asynchronous mechanism for receiving metrics. The `Close()` function should tear down anything created in `Init()`. -Finally, the receiver needs to be registered in the `receiveManager.go`. There is a list of receivers called `AvailableReceivers` which is a map (`receiver_type_string` -> `pointer to Receiver interface`). Add a new entry with a descriptive name and the new receiver. +Finally, the receiver needs to be registered in the `receiveManager.go`. There is a list of receivers called `AvailableReceivers` which is a map (`receiver_type_string` -> `pointer to NewReceiver function`). Add a new entry with a descriptive name and the new receiver. diff --git a/receivers/metricReceiver.go b/receivers/metricReceiver.go index c712186..e1a384a 100644 --- a/receivers/metricReceiver.go +++ b/receivers/metricReceiver.go @@ -20,9 +20,8 @@ type ReceiverConfig struct { } type receiver struct { - typename string - name string - sink chan lp.CCMetric + name string + sink chan lp.CCMetric } type Receiver interface { diff --git a/receivers/natsReceiver.go b/receivers/natsReceiver.go index dc96971..6114ecd 100644 --- a/receivers/natsReceiver.go +++ b/receivers/natsReceiver.go @@ -33,8 +33,7 @@ var DefaultTime = func() time.Time { } func (r *NatsReceiver) Init(name string, config json.RawMessage) error { - r.typename = "NatsReceiver" - r.name = name + r.name = fmt.Sprintf("NatsReceiver(%s)", name) r.config.Addr = nats.DefaultURL r.config.Port = "4222" if len(config) > 0 { @@ -91,3 +90,9 @@ func (r *NatsReceiver) Close() { r.nc.Close() } } + +func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) { + r := new(NatsReceiver) + err := r.Init(name, config) + return r, err +} diff --git a/receivers/receiveManager.go b/receivers/receiveManager.go index 7141170..7b1a8fe 100644 --- a/receivers/receiveManager.go +++ b/receivers/receiveManager.go @@ -9,8 +9,8 @@ import ( lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) -var AvailableReceivers = map[string]Receiver{ - "nats": &NatsReceiver{}, +var AvailableReceivers = map[string]func(name string, config json.RawMessage) (Receiver, error){ + "nats": NewNatsReceiver, } type receiveManager struct { @@ -75,8 +75,7 @@ func (rm *receiveManager) AddInput(name string, rawConfig json.RawMessage) error cclog.ComponentError("ReceiveManager", "SKIP", config.Type, "unknown receiver:", err.Error()) return err } - r := AvailableReceivers[config.Type] - err = r.Init(name, rawConfig) + r, err := AvailableReceivers[config.Type](name, rawConfig) if err != nil { cclog.ComponentError("ReceiveManager", "SKIP", r.Name(), "initialization failed:", err.Error()) return err From 2f363754700a134a8d2e32a8b7e82933890ea267 Mon Sep 17 00:00:00 2001 From: Holger Obermaier Date: Wed, 23 Feb 2022 15:15:17 +0100 Subject: [PATCH 2/2] Refactor: Embed Init() into New() function --- receivers/README.md | 2 -- receivers/metricReceiver.go | 4 --- receivers/natsReceiver.go | 62 +++++++++++++++++-------------------- 3 files changed, 28 insertions(+), 40 deletions(-) diff --git a/receivers/README.md b/receivers/README.md index 49015d3..808dc74 100644 --- a/receivers/README.md +++ b/receivers/README.md @@ -14,7 +14,6 @@ The configuration file for the receivers is a list of configurations. The `type` } ``` - ## Type `nats` ```json @@ -30,7 +29,6 @@ The `nats` receiver subscribes to the topic `database` and listens on `address` # Contributing own receivers A receiver contains a few functions and is derived from the type `Receiver` (in `metricReceiver.go`): -* `Init(name string, config json.RawMessage) error` * `Start() error` * `Close()` * `Name() string` diff --git a/receivers/metricReceiver.go b/receivers/metricReceiver.go index e1a384a..e133354 100644 --- a/receivers/metricReceiver.go +++ b/receivers/metricReceiver.go @@ -1,9 +1,6 @@ package receivers import ( - // "time" - "encoding/json" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) @@ -25,7 +22,6 @@ type receiver struct { } type Receiver interface { - Init(name string, config json.RawMessage) error Start() Close() Name() string diff --git a/receivers/natsReceiver.go b/receivers/natsReceiver.go index 6114ecd..1a5f47b 100644 --- a/receivers/natsReceiver.go +++ b/receivers/natsReceiver.go @@ -32,38 +32,6 @@ var DefaultTime = func() time.Time { return time.Unix(42, 0) } -func (r *NatsReceiver) Init(name string, config json.RawMessage) error { - r.name = fmt.Sprintf("NatsReceiver(%s)", name) - r.config.Addr = nats.DefaultURL - r.config.Port = "4222" - if len(config) > 0 { - err := json.Unmarshal(config, &r.config) - if err != nil { - cclog.ComponentError(r.name, "Error reading config:", err.Error()) - return err - } - } - if len(r.config.Addr) == 0 || - len(r.config.Port) == 0 || - len(r.config.Subject) == 0 { - return errors.New("not all configuration variables set required by NatsReceiver") - } - r.meta = map[string]string{"source": r.name} - uri := fmt.Sprintf("%s:%s", r.config.Addr, r.config.Port) - cclog.ComponentDebug(r.name, "INIT", uri, "Subject", r.config.Subject) - nc, err := nats.Connect(uri) - if err == nil { - r.nc = nc - } else { - r.nc = nil - return err - } - r.handler = influx.NewMetricHandler() - r.parser = influx.NewParser(r.handler) - r.parser.SetTimeFunc(DefaultTime) - return err -} - func (r *NatsReceiver) Start() { cclog.ComponentDebug(r.name, "START") r.nc.Subscribe(r.config.Subject, r._NatsReceive) @@ -93,6 +61,32 @@ func (r *NatsReceiver) Close() { func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) { r := new(NatsReceiver) - err := r.Init(name, config) - return r, err + r.name = fmt.Sprintf("NatsReceiver(%s)", name) + r.config.Addr = nats.DefaultURL + r.config.Port = "4222" + if len(config) > 0 { + err := json.Unmarshal(config, &r.config) + if err != nil { + cclog.ComponentError(r.name, "Error reading config:", err.Error()) + return nil, err + } + } + if len(r.config.Addr) == 0 || + len(r.config.Port) == 0 || + len(r.config.Subject) == 0 { + return nil, errors.New("not all configuration variables set required by NatsReceiver") + } + r.meta = map[string]string{"source": r.name} + uri := fmt.Sprintf("%s:%s", r.config.Addr, r.config.Port) + cclog.ComponentDebug(r.name, "NewNatsReceiver", uri, "Subject", r.config.Subject) + if nc, err := nats.Connect(uri); err == nil { + r.nc = nc + } else { + r.nc = nil + return nil, err + } + r.handler = influx.NewMetricHandler() + r.parser = influx.NewParser(r.handler) + r.parser.SetTimeFunc(DefaultTime) + return r, nil }