mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-10-31 00:55:06 +01:00 
			
		
		
		
	Merge pull request #43 from ClusterCockpit/new_receiver_instances
New receiver instances
This commit is contained in:
		| @@ -7,17 +7,13 @@ This folder contains the ReceiveManager and receiver implementations for the cc- | |||||||
| The configuration file for the receivers is a list of configurations. The `type` field in each specifies which receiver to initialize. | The configuration file for the receivers is a list of configurations. The `type` field in each specifies which receiver to initialize. | ||||||
|  |  | ||||||
| ```json | ```json | ||||||
| [ | { | ||||||
|   { |   "myreceivername" : { | ||||||
|     "type": "nats", |     <receiver-specific configuration> | ||||||
|     "address": "nats://my-url", |  | ||||||
|     "port" : "4222", |  | ||||||
|     "database": "testcluster" |  | ||||||
|   } |   } | ||||||
| ] | } | ||||||
| ``` | ``` | ||||||
|  |  | ||||||
|  |  | ||||||
| ## Type `nats` | ## Type `nats` | ||||||
|  |  | ||||||
| ```json | ```json | ||||||
| @@ -25,20 +21,20 @@ The configuration file for the receivers is a list of configurations. The `type` | |||||||
|   "type": "nats", |   "type": "nats", | ||||||
|   "address": "<nats-URI or hostname>", |   "address": "<nats-URI or hostname>", | ||||||
|   "port" : "<portnumber>", |   "port" : "<portnumber>", | ||||||
|   "database": "<subscribe topic>" |   "subject": "<subscribe topic>" | ||||||
| } | } | ||||||
| ``` | ``` | ||||||
|  |  | ||||||
| The `nats` receiver subscribes to the topic `database` and listens on `address` and `port` for metrics in the InfluxDB line protocol. | The `nats` receiver subscribes to the topic `database` and listens on `address` and `port` for metrics in the InfluxDB line protocol. | ||||||
|  |  | ||||||
| # Contributing own receivers | # Contributing own receivers | ||||||
| A receiver contains three functions and is derived from the type `Receiver` (in `metricReceiver.go`): | A receiver contains a few functions and is derived from the type `Receiver` (in `metricReceiver.go`): | ||||||
| * `Init(config ReceiverConfig) error` |  | ||||||
| * `Start() error` | * `Start() error` | ||||||
| * `Close()` | * `Close()` | ||||||
| * `Name() string` | * `Name() string` | ||||||
| * `SetSink(sink chan ccMetric.CCMetric)` | * `SetSink(sink chan lp.CCMetric)` | ||||||
|  | * `New<Typename>(name string, config json.RawMessage)` | ||||||
|  |  | ||||||
| The data structures should be set up in `Init()` like opening a file or server connection. The `Start()` function should either start a go routine or issue some other asynchronous mechanism for receiving metrics. The `Close()` function should tear down anything created in `Init()`. | The data structures should be set up in `Init()` like opening a file or server connection. The `Start()` function should either start a go routine or issue some other asynchronous mechanism for receiving metrics. The `Close()` function should tear down anything created in `Init()`. | ||||||
|  |  | ||||||
| Finally, the receiver needs to be registered in the `receiveManager.go`. There is a list of receivers called `AvailableReceivers` which is a map (`receiver_type_string` -> `pointer to Receiver interface`). Add a new entry with a descriptive name and the new receiver. | Finally, the receiver needs to be registered in the `receiveManager.go`. There is a list of receivers called `AvailableReceivers` which is a map (`receiver_type_string` -> `pointer to NewReceiver function`). Add a new entry with a descriptive name and the new receiver. | ||||||
|   | |||||||
| @@ -1,9 +1,6 @@ | |||||||
| package receivers | package receivers | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	//	"time" |  | ||||||
| 	"encoding/json" |  | ||||||
|  |  | ||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -20,13 +17,11 @@ type ReceiverConfig struct { | |||||||
| } | } | ||||||
|  |  | ||||||
| type receiver struct { | type receiver struct { | ||||||
| 	typename string |  | ||||||
| 	name string | 	name string | ||||||
| 	sink chan lp.CCMetric | 	sink chan lp.CCMetric | ||||||
| } | } | ||||||
|  |  | ||||||
| type Receiver interface { | type Receiver interface { | ||||||
| 	Init(name string, config json.RawMessage) error |  | ||||||
| 	Start() | 	Start() | ||||||
| 	Close() | 	Close() | ||||||
| 	Name() string | 	Name() string | ||||||
|   | |||||||
| @@ -32,39 +32,6 @@ var DefaultTime = func() time.Time { | |||||||
| 	return time.Unix(42, 0) | 	return time.Unix(42, 0) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (r *NatsReceiver) Init(name string, config json.RawMessage) error { |  | ||||||
| 	r.typename = "NatsReceiver" |  | ||||||
| 	r.name = name |  | ||||||
| 	r.config.Addr = nats.DefaultURL |  | ||||||
| 	r.config.Port = "4222" |  | ||||||
| 	if len(config) > 0 { |  | ||||||
| 		err := json.Unmarshal(config, &r.config) |  | ||||||
| 		if err != nil { |  | ||||||
| 			cclog.ComponentError(r.name, "Error reading config:", err.Error()) |  | ||||||
| 			return err |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	if len(r.config.Addr) == 0 || |  | ||||||
| 		len(r.config.Port) == 0 || |  | ||||||
| 		len(r.config.Subject) == 0 { |  | ||||||
| 		return errors.New("not all configuration variables set required by NatsReceiver") |  | ||||||
| 	} |  | ||||||
| 	r.meta = map[string]string{"source": r.name} |  | ||||||
| 	uri := fmt.Sprintf("%s:%s", r.config.Addr, r.config.Port) |  | ||||||
| 	cclog.ComponentDebug(r.name, "INIT", uri, "Subject", r.config.Subject) |  | ||||||
| 	nc, err := nats.Connect(uri) |  | ||||||
| 	if err == nil { |  | ||||||
| 		r.nc = nc |  | ||||||
| 	} else { |  | ||||||
| 		r.nc = nil |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
| 	r.handler = influx.NewMetricHandler() |  | ||||||
| 	r.parser = influx.NewParser(r.handler) |  | ||||||
| 	r.parser.SetTimeFunc(DefaultTime) |  | ||||||
| 	return err |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (r *NatsReceiver) Start() { | func (r *NatsReceiver) Start() { | ||||||
| 	cclog.ComponentDebug(r.name, "START") | 	cclog.ComponentDebug(r.name, "START") | ||||||
| 	r.nc.Subscribe(r.config.Subject, r._NatsReceive) | 	r.nc.Subscribe(r.config.Subject, r._NatsReceive) | ||||||
| @@ -91,3 +58,35 @@ func (r *NatsReceiver) Close() { | |||||||
| 		r.nc.Close() | 		r.nc.Close() | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) { | ||||||
|  | 	r := new(NatsReceiver) | ||||||
|  | 	r.name = fmt.Sprintf("NatsReceiver(%s)", name) | ||||||
|  | 	r.config.Addr = nats.DefaultURL | ||||||
|  | 	r.config.Port = "4222" | ||||||
|  | 	if len(config) > 0 { | ||||||
|  | 		err := json.Unmarshal(config, &r.config) | ||||||
|  | 		if err != nil { | ||||||
|  | 			cclog.ComponentError(r.name, "Error reading config:", err.Error()) | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	if len(r.config.Addr) == 0 || | ||||||
|  | 		len(r.config.Port) == 0 || | ||||||
|  | 		len(r.config.Subject) == 0 { | ||||||
|  | 		return nil, errors.New("not all configuration variables set required by NatsReceiver") | ||||||
|  | 	} | ||||||
|  | 	r.meta = map[string]string{"source": r.name} | ||||||
|  | 	uri := fmt.Sprintf("%s:%s", r.config.Addr, r.config.Port) | ||||||
|  | 	cclog.ComponentDebug(r.name, "NewNatsReceiver", uri, "Subject", r.config.Subject) | ||||||
|  | 	if nc, err := nats.Connect(uri); err == nil { | ||||||
|  | 		r.nc = nc | ||||||
|  | 	} else { | ||||||
|  | 		r.nc = nil | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	r.handler = influx.NewMetricHandler() | ||||||
|  | 	r.parser = influx.NewParser(r.handler) | ||||||
|  | 	r.parser.SetTimeFunc(DefaultTime) | ||||||
|  | 	return r, nil | ||||||
|  | } | ||||||
|   | |||||||
| @@ -9,8 +9,8 @@ import ( | |||||||
| 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| var AvailableReceivers = map[string]Receiver{ | var AvailableReceivers = map[string]func(name string, config json.RawMessage) (Receiver, error){ | ||||||
| 	"nats": &NatsReceiver{}, | 	"nats": NewNatsReceiver, | ||||||
| } | } | ||||||
|  |  | ||||||
| type receiveManager struct { | type receiveManager struct { | ||||||
| @@ -75,8 +75,7 @@ func (rm *receiveManager) AddInput(name string, rawConfig json.RawMessage) error | |||||||
| 		cclog.ComponentError("ReceiveManager", "SKIP", config.Type, "unknown receiver:", err.Error()) | 		cclog.ComponentError("ReceiveManager", "SKIP", config.Type, "unknown receiver:", err.Error()) | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	r := AvailableReceivers[config.Type] | 	r, err := AvailableReceivers[config.Type](name, rawConfig) | ||||||
| 	err = r.Init(name, rawConfig) |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		cclog.ComponentError("ReceiveManager", "SKIP", r.Name(), "initialization failed:", err.Error()) | 		cclog.ComponentError("ReceiveManager", "SKIP", r.Name(), "initialization failed:", err.Error()) | ||||||
| 		return err | 		return err | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user