mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-04-08 14:35:55 +02:00
Merge branch 'develop' into preconfigured_ganglia_metrics
This commit is contained in:
commit
b58cf56f65
@ -7,17 +7,13 @@ This folder contains the ReceiveManager and receiver implementations for the cc-
|
||||
The configuration file for the receivers is a list of configurations. The `type` field in each specifies which receiver to initialize.
|
||||
|
||||
```json
|
||||
[
|
||||
{
|
||||
"type": "nats",
|
||||
"address": "nats://my-url",
|
||||
"port" : "4222",
|
||||
"database": "testcluster"
|
||||
{
|
||||
"myreceivername" : {
|
||||
<receiver-specific configuration>
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
## Type `nats`
|
||||
|
||||
```json
|
||||
@ -25,20 +21,20 @@ The configuration file for the receivers is a list of configurations. The `type`
|
||||
"type": "nats",
|
||||
"address": "<nats-URI or hostname>",
|
||||
"port" : "<portnumber>",
|
||||
"database": "<subscribe topic>"
|
||||
"subject": "<subscribe topic>"
|
||||
}
|
||||
```
|
||||
|
||||
The `nats` receiver subscribes to the topic `database` and listens on `address` and `port` for metrics in the InfluxDB line protocol.
|
||||
|
||||
# Contributing own receivers
|
||||
A receiver contains three functions and is derived from the type `Receiver` (in `metricReceiver.go`):
|
||||
* `Init(config ReceiverConfig) error`
|
||||
A receiver contains a few functions and is derived from the type `Receiver` (in `metricReceiver.go`):
|
||||
* `Start() error`
|
||||
* `Close()`
|
||||
* `Name() string`
|
||||
* `SetSink(sink chan ccMetric.CCMetric)`
|
||||
* `SetSink(sink chan lp.CCMetric)`
|
||||
* `New<Typename>(name string, config json.RawMessage)`
|
||||
|
||||
The data structures should be set up in `Init()` like opening a file or server connection. The `Start()` function should either start a go routine or issue some other asynchronous mechanism for receiving metrics. The `Close()` function should tear down anything created in `Init()`.
|
||||
|
||||
Finally, the receiver needs to be registered in the `receiveManager.go`. There is a list of receivers called `AvailableReceivers` which is a map (`receiver_type_string` -> `pointer to Receiver interface`). Add a new entry with a descriptive name and the new receiver.
|
||||
Finally, the receiver needs to be registered in the `receiveManager.go`. There is a list of receivers called `AvailableReceivers` which is a map (`receiver_type_string` -> `pointer to NewReceiver function`). Add a new entry with a descriptive name and the new receiver.
|
||||
|
@ -1,9 +1,6 @@
|
||||
package receivers
|
||||
|
||||
import (
|
||||
// "time"
|
||||
"encoding/json"
|
||||
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
)
|
||||
|
||||
@ -20,13 +17,11 @@ type ReceiverConfig struct {
|
||||
}
|
||||
|
||||
type receiver struct {
|
||||
typename string
|
||||
name string
|
||||
sink chan lp.CCMetric
|
||||
name string
|
||||
sink chan lp.CCMetric
|
||||
}
|
||||
|
||||
type Receiver interface {
|
||||
Init(name string, config json.RawMessage) error
|
||||
Start()
|
||||
Close()
|
||||
Name() string
|
||||
|
@ -32,39 +32,6 @@ var DefaultTime = func() time.Time {
|
||||
return time.Unix(42, 0)
|
||||
}
|
||||
|
||||
func (r *NatsReceiver) Init(name string, config json.RawMessage) error {
|
||||
r.typename = "NatsReceiver"
|
||||
r.name = name
|
||||
r.config.Addr = nats.DefaultURL
|
||||
r.config.Port = "4222"
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &r.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(r.name, "Error reading config:", err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
if len(r.config.Addr) == 0 ||
|
||||
len(r.config.Port) == 0 ||
|
||||
len(r.config.Subject) == 0 {
|
||||
return errors.New("not all configuration variables set required by NatsReceiver")
|
||||
}
|
||||
r.meta = map[string]string{"source": r.name}
|
||||
uri := fmt.Sprintf("%s:%s", r.config.Addr, r.config.Port)
|
||||
cclog.ComponentDebug(r.name, "INIT", uri, "Subject", r.config.Subject)
|
||||
nc, err := nats.Connect(uri)
|
||||
if err == nil {
|
||||
r.nc = nc
|
||||
} else {
|
||||
r.nc = nil
|
||||
return err
|
||||
}
|
||||
r.handler = influx.NewMetricHandler()
|
||||
r.parser = influx.NewParser(r.handler)
|
||||
r.parser.SetTimeFunc(DefaultTime)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *NatsReceiver) Start() {
|
||||
cclog.ComponentDebug(r.name, "START")
|
||||
r.nc.Subscribe(r.config.Subject, r._NatsReceive)
|
||||
@ -91,3 +58,35 @@ func (r *NatsReceiver) Close() {
|
||||
r.nc.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) {
|
||||
r := new(NatsReceiver)
|
||||
r.name = fmt.Sprintf("NatsReceiver(%s)", name)
|
||||
r.config.Addr = nats.DefaultURL
|
||||
r.config.Port = "4222"
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &r.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(r.name, "Error reading config:", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if len(r.config.Addr) == 0 ||
|
||||
len(r.config.Port) == 0 ||
|
||||
len(r.config.Subject) == 0 {
|
||||
return nil, errors.New("not all configuration variables set required by NatsReceiver")
|
||||
}
|
||||
r.meta = map[string]string{"source": r.name}
|
||||
uri := fmt.Sprintf("%s:%s", r.config.Addr, r.config.Port)
|
||||
cclog.ComponentDebug(r.name, "NewNatsReceiver", uri, "Subject", r.config.Subject)
|
||||
if nc, err := nats.Connect(uri); err == nil {
|
||||
r.nc = nc
|
||||
} else {
|
||||
r.nc = nil
|
||||
return nil, err
|
||||
}
|
||||
r.handler = influx.NewMetricHandler()
|
||||
r.parser = influx.NewParser(r.handler)
|
||||
r.parser.SetTimeFunc(DefaultTime)
|
||||
return r, nil
|
||||
}
|
||||
|
@ -9,8 +9,8 @@ import (
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
)
|
||||
|
||||
var AvailableReceivers = map[string]Receiver{
|
||||
"nats": &NatsReceiver{},
|
||||
var AvailableReceivers = map[string]func(name string, config json.RawMessage) (Receiver, error){
|
||||
"nats": NewNatsReceiver,
|
||||
}
|
||||
|
||||
type receiveManager struct {
|
||||
@ -30,11 +30,13 @@ type ReceiveManager interface {
|
||||
}
|
||||
|
||||
func (rm *receiveManager) Init(wg *sync.WaitGroup, receiverConfigFile string) error {
|
||||
// Initialize struct fields
|
||||
rm.inputs = make([]Receiver, 0)
|
||||
rm.output = nil
|
||||
rm.done = make(chan bool)
|
||||
rm.wg = wg
|
||||
rm.config = make([]json.RawMessage, 0)
|
||||
|
||||
configFile, err := os.Open(receiverConfigFile)
|
||||
if err != nil {
|
||||
cclog.ComponentError("ReceiveManager", err.Error())
|
||||
@ -51,6 +53,7 @@ func (rm *receiveManager) Init(wg *sync.WaitGroup, receiverConfigFile string) er
|
||||
for name, raw := range rawConfigs {
|
||||
rm.AddInput(name, raw)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -75,10 +78,9 @@ func (rm *receiveManager) AddInput(name string, rawConfig json.RawMessage) error
|
||||
cclog.ComponentError("ReceiveManager", "SKIP", config.Type, "unknown receiver:", err.Error())
|
||||
return err
|
||||
}
|
||||
r := AvailableReceivers[config.Type]
|
||||
err = r.Init(name, rawConfig)
|
||||
r, err := AvailableReceivers[config.Type](name, rawConfig)
|
||||
if err != nil {
|
||||
cclog.ComponentError("ReceiveManager", "SKIP", r.Name(), "initialization failed:", err.Error())
|
||||
cclog.ComponentError("ReceiveManager", "SKIP", name, "initialization failed:", err.Error())
|
||||
return err
|
||||
}
|
||||
rm.inputs = append(rm.inputs, r)
|
||||
|
@ -6,6 +6,7 @@ This folder contains the SinkManager and sink implementations for the cc-metric-
|
||||
- [`stdout`](./stdoutSink.md): Print all metrics to `stdout`, `stderr` or a file
|
||||
- [`http`](./httpSink.md): Send metrics to an HTTP server as POST requests
|
||||
- [`influxdb`](./influxSink.md): Send metrics to an [InfluxDB](https://www.influxdata.com/products/influxdb/) database
|
||||
- [`influxasync`](./influxAsyncSink.md): Send metrics to an [InfluxDB](https://www.influxdata.com/products/influxdb/) database with non-blocking write API
|
||||
- [`nats`](./natsSink.md): Publish metrics to the [NATS](https://nats.io/) network overlay system
|
||||
- [`ganglia`](./gangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) using the `gmetric` CLI tool
|
||||
- [`libganglia`](./libgangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) directly using `libganglia.so`
|
||||
@ -34,11 +35,12 @@ The configuration file for the sinks is a list of configurations. The `type` fie
|
||||
|
||||
|
||||
# Contributing own sinks
|
||||
A sink contains four functions and is derived from the type `sink`:
|
||||
* `Init(config json.RawMessage) error`
|
||||
A sink contains five functions and is derived from the type `sink`:
|
||||
* `Init(name string, config json.RawMessage) error`
|
||||
* `Write(point CCMetric) error`
|
||||
* `Flush() error`
|
||||
* `Close()`
|
||||
* `New<Typename>(name string, config json.RawMessage) (Sink, error)` (calls the `Init()` function)
|
||||
|
||||
The data structures should be set up in `Init()` like opening a file or server connection. The `Write()` function writes/sends the data. For non-blocking sinks, the `Flush()` method tells the sink to drain its internal buffers. The `Close()` function should tear down anything created in `Init()`.
|
||||
|
||||
@ -65,8 +67,8 @@ type SampleSink struct {
|
||||
}
|
||||
|
||||
// Initialize the sink by giving it a name and reading in the config JSON
|
||||
func (s *SampleSink) Init(config json.RawMessage) error {
|
||||
s.name = "SampleSink" // Always specify a name here
|
||||
func (s *SampleSink) Init(name string, config json.RawMessage) error {
|
||||
s.name = fmt.Sprintf("SampleSink(%s)", name) // Always specify a name here
|
||||
// Read in the config JSON
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
@ -91,4 +93,13 @@ func (s *SampleSink) Flush() error {
|
||||
|
||||
// Close sink: close network connection, close files, close libraries, ...
|
||||
func (s *SampleSink) Close() {}
|
||||
|
||||
|
||||
// New function to create a new instance of the sink
|
||||
func NewSampleSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(SampleSink)
|
||||
err := s.Init(name, config)
|
||||
return s, err
|
||||
}
|
||||
|
||||
```
|
@ -34,42 +34,6 @@ type GangliaSink struct {
|
||||
config GangliaSinkConfig
|
||||
}
|
||||
|
||||
func (s *GangliaSink) Init(config json.RawMessage) error {
|
||||
var err error = nil
|
||||
s.name = "GangliaSink"
|
||||
s.config.AddTagsAsDesc = false
|
||||
s.config.AddGangliaGroup = false
|
||||
s.config.AddUnits = false
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
s.gmetric_path = ""
|
||||
s.gmetric_config = ""
|
||||
if len(s.config.GmetricPath) > 0 {
|
||||
p, err := exec.LookPath(s.config.GmetricPath)
|
||||
if err == nil {
|
||||
s.gmetric_path = p
|
||||
}
|
||||
}
|
||||
if len(s.gmetric_path) == 0 {
|
||||
p, err := exec.LookPath(string(GMETRIC_EXEC))
|
||||
if err == nil {
|
||||
s.gmetric_path = p
|
||||
}
|
||||
}
|
||||
if len(s.gmetric_path) == 0 {
|
||||
err = errors.New("cannot find executable 'gmetric'")
|
||||
}
|
||||
if len(s.config.GmetricConfig) > 0 {
|
||||
s.gmetric_config = s.config.GmetricConfig
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *GangliaSink) Write(point lp.CCMetric) error {
|
||||
var err error = nil
|
||||
//var tagsstr []string
|
||||
@ -127,4 +91,37 @@ func (s *GangliaSink) Flush() error {
|
||||
func (s *GangliaSink) Close() {
|
||||
}
|
||||
|
||||
func NewGangliaSink()
|
||||
func NewGangliaSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(GangliaSink)
|
||||
s.name = fmt.Sprintf("GangliaSink(%s)", name)
|
||||
s.config.AddTagsAsDesc = false
|
||||
s.config.AddGangliaGroup = false
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
s.gmetric_path = ""
|
||||
s.gmetric_config = ""
|
||||
if len(s.config.GmetricPath) > 0 {
|
||||
p, err := exec.LookPath(s.config.GmetricPath)
|
||||
if err == nil {
|
||||
s.gmetric_path = p
|
||||
}
|
||||
}
|
||||
if len(s.gmetric_path) == 0 {
|
||||
p, err := exec.LookPath(string(GMETRIC_EXEC))
|
||||
if err == nil {
|
||||
s.gmetric_path = p
|
||||
}
|
||||
}
|
||||
if len(s.gmetric_path) == 0 {
|
||||
return nil, errors.New("cannot find executable 'gmetric'")
|
||||
}
|
||||
if len(s.config.GmetricConfig) > 0 {
|
||||
s.gmetric_config = s.config.GmetricConfig
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
@ -38,57 +38,6 @@ type HttpSink struct {
|
||||
flushDelay time.Duration
|
||||
}
|
||||
|
||||
func (s *HttpSink) Init(config json.RawMessage) error {
|
||||
// Set default values
|
||||
s.name = "HttpSink"
|
||||
s.config.MaxIdleConns = 10
|
||||
s.config.IdleConnTimeout = "5s"
|
||||
s.config.Timeout = "5s"
|
||||
s.config.FlushDelay = "1s"
|
||||
|
||||
// Read config
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if len(s.config.URL) == 0 {
|
||||
return errors.New("`url` config option is required for HTTP sink")
|
||||
}
|
||||
if s.config.MaxIdleConns > 0 {
|
||||
s.maxIdleConns = s.config.MaxIdleConns
|
||||
}
|
||||
if len(s.config.IdleConnTimeout) > 0 {
|
||||
t, err := time.ParseDuration(s.config.IdleConnTimeout)
|
||||
if err == nil {
|
||||
s.idleConnTimeout = t
|
||||
}
|
||||
}
|
||||
if len(s.config.Timeout) > 0 {
|
||||
t, err := time.ParseDuration(s.config.Timeout)
|
||||
if err == nil {
|
||||
s.timeout = t
|
||||
}
|
||||
}
|
||||
if len(s.config.FlushDelay) > 0 {
|
||||
t, err := time.ParseDuration(s.config.FlushDelay)
|
||||
if err == nil {
|
||||
s.flushDelay = t
|
||||
}
|
||||
}
|
||||
tr := &http.Transport{
|
||||
MaxIdleConns: s.maxIdleConns,
|
||||
IdleConnTimeout: s.idleConnTimeout,
|
||||
}
|
||||
s.client = &http.Client{Transport: tr, Timeout: s.timeout}
|
||||
s.buffer = &bytes.Buffer{}
|
||||
s.encoder = influx.NewEncoder(s.buffer)
|
||||
s.encoder.SetPrecision(time.Second)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *HttpSink) Write(m lp.CCMetric) error {
|
||||
if s.buffer.Len() == 0 && s.flushDelay != 0 {
|
||||
// This is the first write since the last flush, start the flushTimer!
|
||||
@ -169,3 +118,54 @@ func (s *HttpSink) Close() {
|
||||
}
|
||||
s.client.CloseIdleConnections()
|
||||
}
|
||||
|
||||
func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(HttpSink)
|
||||
// Set default values
|
||||
s.name = fmt.Sprintf("HttpSink(%s)", name)
|
||||
s.config.MaxIdleConns = 10
|
||||
s.config.IdleConnTimeout = "5s"
|
||||
s.config.Timeout = "5s"
|
||||
s.config.FlushDelay = "1s"
|
||||
|
||||
// Read config
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if len(s.config.URL) == 0 {
|
||||
return nil, errors.New("`url` config option is required for HTTP sink")
|
||||
}
|
||||
if s.config.MaxIdleConns > 0 {
|
||||
s.maxIdleConns = s.config.MaxIdleConns
|
||||
}
|
||||
if len(s.config.IdleConnTimeout) > 0 {
|
||||
t, err := time.ParseDuration(s.config.IdleConnTimeout)
|
||||
if err == nil {
|
||||
s.idleConnTimeout = t
|
||||
}
|
||||
}
|
||||
if len(s.config.Timeout) > 0 {
|
||||
t, err := time.ParseDuration(s.config.Timeout)
|
||||
if err == nil {
|
||||
s.timeout = t
|
||||
}
|
||||
}
|
||||
if len(s.config.FlushDelay) > 0 {
|
||||
t, err := time.ParseDuration(s.config.FlushDelay)
|
||||
if err == nil {
|
||||
s.flushDelay = t
|
||||
}
|
||||
}
|
||||
tr := &http.Transport{
|
||||
MaxIdleConns: s.maxIdleConns,
|
||||
IdleConnTimeout: s.idleConnTimeout,
|
||||
}
|
||||
s.client = &http.Client{Transport: tr, Timeout: s.timeout}
|
||||
s.buffer = &bytes.Buffer{}
|
||||
s.encoder = influx.NewEncoder(s.buffer)
|
||||
s.encoder.SetPrecision(time.Second)
|
||||
return s, nil
|
||||
}
|
||||
|
@ -30,11 +30,10 @@ type InfluxAsyncSinkConfig struct {
|
||||
|
||||
type InfluxAsyncSink struct {
|
||||
sink
|
||||
client influxdb2.Client
|
||||
writeApi influxdb2Api.WriteAPI
|
||||
retPolicy string
|
||||
errors <-chan error
|
||||
config InfluxAsyncSinkConfig
|
||||
client influxdb2.Client
|
||||
writeApi influxdb2Api.WriteAPI
|
||||
errors <-chan error
|
||||
config InfluxAsyncSinkConfig
|
||||
}
|
||||
|
||||
func (s *InfluxAsyncSink) connect() error {
|
||||
@ -68,39 +67,6 @@ func (s *InfluxAsyncSink) connect() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *InfluxAsyncSink) Init(config json.RawMessage) error {
|
||||
s.name = "InfluxSink"
|
||||
|
||||
// Set default for maximum number of points sent to server in single request.
|
||||
s.config.BatchSize = 100
|
||||
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if len(s.config.Host) == 0 ||
|
||||
len(s.config.Port) == 0 ||
|
||||
len(s.config.Database) == 0 ||
|
||||
len(s.config.Organization) == 0 ||
|
||||
len(s.config.Password) == 0 {
|
||||
return errors.New("not all configuration variables set required by InfluxAsyncSink")
|
||||
}
|
||||
|
||||
// Connect to InfluxDB server
|
||||
err := s.connect()
|
||||
|
||||
// Start background: Read from error channel
|
||||
s.errors = s.writeApi.Errors()
|
||||
go func() {
|
||||
for err := range s.errors {
|
||||
cclog.ComponentError(s.name, err.Error())
|
||||
}
|
||||
}()
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
|
||||
s.writeApi.WritePoint(
|
||||
m.ToPoint(s.config.MetaAsTags),
|
||||
@ -118,3 +84,40 @@ func (s *InfluxAsyncSink) Close() {
|
||||
s.writeApi.Flush()
|
||||
s.client.Close()
|
||||
}
|
||||
|
||||
func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(InfluxAsyncSink)
|
||||
s.name = fmt.Sprintf("InfluxSink(%s)", name)
|
||||
|
||||
// Set default for maximum number of points sent to server in single request.
|
||||
s.config.BatchSize = 100
|
||||
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if len(s.config.Host) == 0 ||
|
||||
len(s.config.Port) == 0 ||
|
||||
len(s.config.Database) == 0 ||
|
||||
len(s.config.Organization) == 0 ||
|
||||
len(s.config.Password) == 0 {
|
||||
return nil, errors.New("not all configuration variables set required by InfluxAsyncSink")
|
||||
}
|
||||
|
||||
// Connect to InfluxDB server
|
||||
if err := s.connect(); err != nil {
|
||||
return nil, fmt.Errorf("Unable to connect: %v", err)
|
||||
}
|
||||
|
||||
// Start background: Read from error channel
|
||||
s.errors = s.writeApi.Errors()
|
||||
go func() {
|
||||
for err := range s.errors {
|
||||
cclog.ComponentError(s.name, err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
@ -57,26 +57,6 @@ func (s *InfluxSink) connect() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *InfluxSink) Init(config json.RawMessage) error {
|
||||
s.name = "InfluxSink"
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if len(s.config.Host) == 0 ||
|
||||
len(s.config.Port) == 0 ||
|
||||
len(s.config.Database) == 0 ||
|
||||
len(s.config.Organization) == 0 ||
|
||||
len(s.config.Password) == 0 {
|
||||
return errors.New("not all configuration variables set required by InfluxSink")
|
||||
}
|
||||
|
||||
// Connect to InfluxDB server
|
||||
return s.connect()
|
||||
}
|
||||
|
||||
func (s *InfluxSink) Write(m lp.CCMetric) error {
|
||||
err :=
|
||||
s.writeApi.WritePoint(
|
||||
@ -94,3 +74,27 @@ func (s *InfluxSink) Close() {
|
||||
cclog.ComponentDebug(s.name, "Closing InfluxDB connection")
|
||||
s.client.Close()
|
||||
}
|
||||
|
||||
func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(InfluxSink)
|
||||
s.name = fmt.Sprintf("InfluxSink(%s)", name)
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if len(s.config.Host) == 0 ||
|
||||
len(s.config.Port) == 0 ||
|
||||
len(s.config.Database) == 0 ||
|
||||
len(s.config.Organization) == 0 ||
|
||||
len(s.config.Password) == 0 {
|
||||
return nil, errors.New("not all configuration variables set required by InfluxSink")
|
||||
}
|
||||
|
||||
// Connect to InfluxDB server
|
||||
if err := s.connect(); err != nil {
|
||||
return nil, fmt.Errorf("Unable to connect: %v", err)
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
@ -109,65 +109,6 @@ type LibgangliaSink struct {
|
||||
cstrCache map[string]*C.char
|
||||
}
|
||||
|
||||
func (s *LibgangliaSink) Init(config json.RawMessage) error {
|
||||
var err error = nil
|
||||
s.name = "LibgangliaSink"
|
||||
//s.config.AddTagsAsDesc = false
|
||||
s.config.AddGangliaGroup = false
|
||||
s.config.AddTypeToName = false
|
||||
s.config.AddUnits = true
|
||||
s.config.GmondConfig = string(GMOND_CONFIG_FILE)
|
||||
s.config.GangliaLib = string(GANGLIA_LIB_NAME)
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(s.name, "Error reading config:", err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
lib := dl.New(s.config.GangliaLib, GANGLIA_LIB_DL_FLAGS)
|
||||
if lib == nil {
|
||||
return fmt.Errorf("error instantiating DynamicLibrary for %s", s.config.GangliaLib)
|
||||
}
|
||||
err = lib.Open()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err)
|
||||
}
|
||||
|
||||
// Set up cache for the C strings
|
||||
s.cstrCache = make(map[string]*C.char)
|
||||
// s.cstrCache["globals"] = C.CString("globals")
|
||||
|
||||
// s.cstrCache["override_hostname"] = C.CString("override_hostname")
|
||||
// s.cstrCache["override_ip"] = C.CString("override_ip")
|
||||
|
||||
// Add some constant strings
|
||||
s.cstrCache["GROUP"] = C.CString("GROUP")
|
||||
s.cstrCache["CLUSTER"] = C.CString("CLUSTER")
|
||||
s.cstrCache[""] = C.CString("")
|
||||
|
||||
// Add cluster name for lookup in Write()
|
||||
if len(s.config.ClusterName) > 0 {
|
||||
s.cstrCache[s.config.ClusterName] = C.CString(s.config.ClusterName)
|
||||
}
|
||||
// Add supported types for later lookup in Write()
|
||||
s.cstrCache["double"] = C.CString("double")
|
||||
s.cstrCache["int32"] = C.CString("int32")
|
||||
s.cstrCache["string"] = C.CString("string")
|
||||
|
||||
// Create Ganglia pool
|
||||
s.global_context = C.Ganglia_pool_create(nil)
|
||||
// Load Ganglia configuration
|
||||
s.cstrCache[s.config.GmondConfig] = C.CString(s.config.GmondConfig)
|
||||
s.gmond_config = C.Ganglia_gmond_config_create(s.cstrCache[s.config.GmondConfig], 0)
|
||||
//globals := C.cfg_getsec(gmond_config, s.cstrCache["globals"])
|
||||
//override_hostname := C.cfg_getstr(globals, s.cstrCache["override_hostname"])
|
||||
//override_ip := C.cfg_getstr(globals, s.cstrCache["override_ip"])
|
||||
|
||||
s.send_channels = C.Ganglia_udp_send_channels_create(s.global_context, s.gmond_config)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *LibgangliaSink) Write(point lp.CCMetric) error {
|
||||
var err error = nil
|
||||
var c_name *C.char
|
||||
@ -283,3 +224,63 @@ func (s *LibgangliaSink) Close() {
|
||||
C.free(unsafe.Pointer(cstr))
|
||||
}
|
||||
}
|
||||
|
||||
func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(LibgangliaSink)
|
||||
var err error = nil
|
||||
s.name = fmt.Sprintf("LibgangliaSink(%s)", name)
|
||||
//s.config.AddTagsAsDesc = false
|
||||
s.config.AddGangliaGroup = false
|
||||
s.config.AddTypeToName = false
|
||||
s.config.AddUnits = true
|
||||
s.config.GmondConfig = string(GMOND_CONFIG_FILE)
|
||||
s.config.GangliaLib = string(GANGLIA_LIB_NAME)
|
||||
if len(config) > 0 {
|
||||
err = json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(s.name, "Error reading config:", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
lib := dl.New(s.config.GangliaLib, GANGLIA_LIB_DL_FLAGS)
|
||||
if lib == nil {
|
||||
return nil, fmt.Errorf("error instantiating DynamicLibrary for %s", s.config.GangliaLib)
|
||||
}
|
||||
err = lib.Open()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err)
|
||||
}
|
||||
|
||||
// Set up cache for the C strings
|
||||
s.cstrCache = make(map[string]*C.char)
|
||||
// s.cstrCache["globals"] = C.CString("globals")
|
||||
|
||||
// s.cstrCache["override_hostname"] = C.CString("override_hostname")
|
||||
// s.cstrCache["override_ip"] = C.CString("override_ip")
|
||||
|
||||
// Add some constant strings
|
||||
s.cstrCache["GROUP"] = C.CString("GROUP")
|
||||
s.cstrCache["CLUSTER"] = C.CString("CLUSTER")
|
||||
s.cstrCache[""] = C.CString("")
|
||||
|
||||
// Add cluster name for lookup in Write()
|
||||
if len(s.config.ClusterName) > 0 {
|
||||
s.cstrCache[s.config.ClusterName] = C.CString(s.config.ClusterName)
|
||||
}
|
||||
// Add supported types for later lookup in Write()
|
||||
s.cstrCache["double"] = C.CString("double")
|
||||
s.cstrCache["int32"] = C.CString("int32")
|
||||
s.cstrCache["string"] = C.CString("string")
|
||||
|
||||
// Create Ganglia pool
|
||||
s.global_context = C.Ganglia_pool_create(nil)
|
||||
// Load Ganglia configuration
|
||||
s.cstrCache[s.config.GmondConfig] = C.CString(s.config.GmondConfig)
|
||||
s.gmond_config = C.Ganglia_gmond_config_create(s.cstrCache[s.config.GmondConfig], 0)
|
||||
//globals := C.cfg_getsec(gmond_config, s.cstrCache["globals"])
|
||||
//override_hostname := C.cfg_getstr(globals, s.cstrCache["override_hostname"])
|
||||
//override_ip := C.cfg_getstr(globals, s.cstrCache["override_ip"])
|
||||
|
||||
s.send_channels = C.Ganglia_udp_send_channels_create(s.global_context, s.gmond_config)
|
||||
return s, nil
|
||||
}
|
||||
|
@ -1,8 +1,6 @@
|
||||
package sinks
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
|
||||
)
|
||||
|
||||
@ -17,7 +15,6 @@ type sink struct {
|
||||
}
|
||||
|
||||
type Sink interface {
|
||||
Init(config json.RawMessage) error
|
||||
Write(point lp.CCMetric) error
|
||||
Flush() error
|
||||
Close()
|
||||
|
@ -53,30 +53,6 @@ func (s *NatsSink) connect() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *NatsSink) Init(config json.RawMessage) error {
|
||||
s.name = "NatsSink"
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
|
||||
return err
|
||||
}
|
||||
}
|
||||
if len(s.config.Host) == 0 ||
|
||||
len(s.config.Port) == 0 ||
|
||||
len(s.config.Database) == 0 {
|
||||
return errors.New("not all configuration variables set required by NatsSink")
|
||||
}
|
||||
// Setup Influx line protocol
|
||||
s.buffer = &bytes.Buffer{}
|
||||
s.buffer.Grow(1025)
|
||||
s.encoder = influx.NewEncoder(s.buffer)
|
||||
s.encoder.SetPrecision(time.Second)
|
||||
s.encoder.SetMaxLineBytes(1024)
|
||||
// Setup infos for connection
|
||||
return s.connect()
|
||||
}
|
||||
|
||||
func (s *NatsSink) Write(m lp.CCMetric) error {
|
||||
if s.client != nil {
|
||||
_, err := s.encoder.Encode(m.ToPoint(s.config.MetaAsTags))
|
||||
@ -105,3 +81,31 @@ func (s *NatsSink) Close() {
|
||||
s.client.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(NatsSink)
|
||||
s.name = fmt.Sprintf("NatsSink(%s)", name)
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if len(s.config.Host) == 0 ||
|
||||
len(s.config.Port) == 0 ||
|
||||
len(s.config.Database) == 0 {
|
||||
return nil, errors.New("not all configuration variables set required by NatsSink")
|
||||
}
|
||||
// Setup Influx line protocol
|
||||
s.buffer = &bytes.Buffer{}
|
||||
s.buffer.Grow(1025)
|
||||
s.encoder = influx.NewEncoder(s.buffer)
|
||||
s.encoder.SetPrecision(time.Second)
|
||||
s.encoder.SetMaxLineBytes(1024)
|
||||
// Setup infos for connection
|
||||
if err := s.connect(); err != nil {
|
||||
return nil, fmt.Errorf("Unable to connect: %v", err)
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
@ -13,14 +13,14 @@ import (
|
||||
const SINK_MAX_FORWARD = 50
|
||||
|
||||
// Map of all available sinks
|
||||
var AvailableSinks = map[string]Sink{
|
||||
"influxdb": new(InfluxSink),
|
||||
"stdout": new(StdoutSink),
|
||||
"nats": new(NatsSink),
|
||||
"http": new(HttpSink),
|
||||
"ganglia": new(GangliaSink),
|
||||
"influxasync": new(InfluxAsyncSink),
|
||||
"libganglia": new(LibgangliaSink),
|
||||
var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink, error){
|
||||
"ganglia": NewGangliaSink,
|
||||
"libganglia": NewLibgangliaSink,
|
||||
"stdout": NewStdoutSink,
|
||||
"nats": NewNatsSink,
|
||||
"influxdb": NewInfluxSink,
|
||||
"influxasync": NewInfluxAsyncSink,
|
||||
"http": NewHttpSink,
|
||||
}
|
||||
|
||||
// Metric collector manager data structure
|
||||
@ -149,8 +149,7 @@ func (sm *sinkManager) AddOutput(name string, rawConfig json.RawMessage) error {
|
||||
cclog.ComponentError("SinkManager", "SKIP", name, "unknown sink:", sinkConfig.Type)
|
||||
return err
|
||||
}
|
||||
s := AvailableSinks[sinkConfig.Type]
|
||||
err = s.Init(rawConfig)
|
||||
s, err := AvailableSinks[sinkConfig.Type](name, rawConfig)
|
||||
if err != nil {
|
||||
cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error())
|
||||
return err
|
||||
|
@ -19,34 +19,6 @@ type StdoutSink struct {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StdoutSink) Init(config json.RawMessage) error {
|
||||
s.name = "StdoutSink"
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
s.output = os.Stdout
|
||||
if len(s.config.Output) > 0 {
|
||||
switch strings.ToLower(s.config.Output) {
|
||||
case "stdout":
|
||||
s.output = os.Stdout
|
||||
case "stderr":
|
||||
s.output = os.Stderr
|
||||
default:
|
||||
f, err := os.OpenFile(s.config.Output, os.O_CREATE|os.O_WRONLY, os.FileMode(0600))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.output = f
|
||||
}
|
||||
}
|
||||
s.meta_as_tags = s.config.MetaAsTags
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *StdoutSink) Write(m lp.CCMetric) error {
|
||||
fmt.Fprint(
|
||||
s.output,
|
||||
@ -65,3 +37,33 @@ func (s *StdoutSink) Close() {
|
||||
s.output.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func NewStdoutSink(name string, config json.RawMessage) (Sink, error) {
|
||||
s := new(StdoutSink)
|
||||
s.name = fmt.Sprintf("StdoutSink(%s)", name)
|
||||
if len(config) > 0 {
|
||||
err := json.Unmarshal(config, &s.config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
s.output = os.Stdout
|
||||
if len(s.config.Output) > 0 {
|
||||
switch strings.ToLower(s.config.Output) {
|
||||
case "stdout":
|
||||
s.output = os.Stdout
|
||||
case "stderr":
|
||||
s.output = os.Stderr
|
||||
default:
|
||||
f, err := os.OpenFile(s.config.Output, os.O_CREATE|os.O_WRONLY, os.FileMode(0600))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.output = f
|
||||
}
|
||||
}
|
||||
s.meta_as_tags = s.config.MetaAsTags
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user