From 18a226183c92cf10c59be029d1667761ba592f23 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Tue, 22 Feb 2022 16:15:25 +0100 Subject: [PATCH 1/3] Use new sink instances to allow multiple of same sink type --- sinks/gangliaSink.go | 10 +++++++--- sinks/httpSink.go | 10 ++++++++-- sinks/influxAsyncSink.go | 10 ++++++++-- sinks/influxSink.go | 10 ++++++++-- sinks/libgangliaSink.go | 10 ++++++++-- sinks/metricSink.go | 2 +- sinks/natsSink.go | 10 ++++++++-- sinks/sinkManager.go | 19 +++++++++---------- sinks/stdoutSink.go | 10 ++++++++-- 9 files changed, 65 insertions(+), 26 deletions(-) diff --git a/sinks/gangliaSink.go b/sinks/gangliaSink.go index ae53dd8..0c9459b 100644 --- a/sinks/gangliaSink.go +++ b/sinks/gangliaSink.go @@ -33,9 +33,9 @@ type GangliaSink struct { config GangliaSinkConfig } -func (s *GangliaSink) Init(config json.RawMessage) error { +func (s *GangliaSink) Init(name string, config json.RawMessage) error { var err error = nil - s.name = "GangliaSink" + s.name = fmt.Sprintf("GangliaSink(%s)", name) s.config.AddTagsAsDesc = false s.config.AddGangliaGroup = false if len(config) > 0 { @@ -168,4 +168,8 @@ func (s *GangliaSink) Flush() error { func (s *GangliaSink) Close() { } -func NewGangliaSink() +func NewGangliaSink(name string, config json.RawMessage) (Sink, error) { + s := new(GangliaSink) + s.Init(name, config) + return s, nil +} diff --git a/sinks/httpSink.go b/sinks/httpSink.go index ce46bab..41a5919 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -38,9 +38,9 @@ type HttpSink struct { flushDelay time.Duration } -func (s *HttpSink) Init(config json.RawMessage) error { +func (s *HttpSink) Init(name string, config json.RawMessage) error { // Set default values - s.name = "HttpSink" + s.name = fmt.Sprintf("HttpSink(%s)", name) s.config.MaxIdleConns = 10 s.config.IdleConnTimeout = "5s" s.config.Timeout = "5s" @@ -169,3 +169,9 @@ func (s *HttpSink) Close() { } s.client.CloseIdleConnections() } + +func NewHttpSink(name string, config json.RawMessage) (Sink, error) { + s := new(HttpSink) + s.Init(name, config) + return s, nil +} diff --git a/sinks/influxAsyncSink.go b/sinks/influxAsyncSink.go index 20aa60c..e2f8995 100644 --- a/sinks/influxAsyncSink.go +++ b/sinks/influxAsyncSink.go @@ -68,8 +68,8 @@ func (s *InfluxAsyncSink) connect() error { return nil } -func (s *InfluxAsyncSink) Init(config json.RawMessage) error { - s.name = "InfluxSink" +func (s *InfluxAsyncSink) Init(name string, config json.RawMessage) error { + s.name = fmt.Sprintf("InfluxSink(%s)", name) // Set default for maximum number of points sent to server in single request. s.config.BatchSize = 100 @@ -118,3 +118,9 @@ func (s *InfluxAsyncSink) Close() { s.writeApi.Flush() s.client.Close() } + +func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { + s := new(InfluxAsyncSink) + s.Init(name, config) + return s, nil +} diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 99304c0..f235054 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -57,8 +57,8 @@ func (s *InfluxSink) connect() error { return nil } -func (s *InfluxSink) Init(config json.RawMessage) error { - s.name = "InfluxSink" +func (s *InfluxSink) Init(name string, config json.RawMessage) error { + s.name = fmt.Sprintf("InfluxSink(%s)", name) if len(config) > 0 { err := json.Unmarshal(config, &s.config) if err != nil { @@ -94,3 +94,9 @@ func (s *InfluxSink) Close() { cclog.ComponentDebug(s.name, "Closing InfluxDB connection") s.client.Close() } + +func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { + s := new(InfluxSink) + s.Init(name, config) + return s, nil +} diff --git a/sinks/libgangliaSink.go b/sinks/libgangliaSink.go index 7136e42..9a45df4 100644 --- a/sinks/libgangliaSink.go +++ b/sinks/libgangliaSink.go @@ -109,9 +109,9 @@ type LibgangliaSink struct { cstrCache map[string]*C.char } -func (s *LibgangliaSink) Init(config json.RawMessage) error { +func (s *LibgangliaSink) Init(name string, config json.RawMessage) error { var err error = nil - s.name = "LibgangliaSink" + s.name = fmt.Sprintf("LibgangliaSink(%s)", name) //s.config.AddTagsAsDesc = false s.config.AddGangliaGroup = false s.config.AddTypeToName = false @@ -316,3 +316,9 @@ func (s *LibgangliaSink) Close() { C.free(unsafe.Pointer(cstr)) } } + +func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) { + s := new(LibgangliaSink) + s.Init(name, config) + return s, nil +} diff --git a/sinks/metricSink.go b/sinks/metricSink.go index d76f5f2..4583a2c 100644 --- a/sinks/metricSink.go +++ b/sinks/metricSink.go @@ -17,7 +17,7 @@ type sink struct { } type Sink interface { - Init(config json.RawMessage) error + Init(name string, config json.RawMessage) error Write(point lp.CCMetric) error Flush() error Close() diff --git a/sinks/natsSink.go b/sinks/natsSink.go index 187157e..6087da0 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -53,8 +53,8 @@ func (s *NatsSink) connect() error { return nil } -func (s *NatsSink) Init(config json.RawMessage) error { - s.name = "NatsSink" +func (s *NatsSink) Init(name string, config json.RawMessage) error { + s.name = fmt.Sprintf("NatsSink(%s)", name) if len(config) > 0 { err := json.Unmarshal(config, &s.config) if err != nil { @@ -105,3 +105,9 @@ func (s *NatsSink) Close() { s.client.Close() } } + +func NewNatsSink(name string, config json.RawMessage) (Sink, error) { + s := new(NatsSink) + s.Init(name, config) + return s, nil +} diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index 487e7ca..f531f5d 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -13,14 +13,14 @@ import ( const SINK_MAX_FORWARD = 50 // Map of all available sinks -var AvailableSinks = map[string]Sink{ - "influxdb": new(InfluxSink), - "stdout": new(StdoutSink), - "nats": new(NatsSink), - "http": new(HttpSink), - "ganglia": new(GangliaSink), - "influxasync": new(InfluxAsyncSink), - "libganglia": new(LibgangliaSink), +var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink, error){ + "ganglia": NewGangliaSink, + "libganglia": NewLibgangliaSink, + "stdout": NewStdoutSink, + "nats": NewNatsSink, + "influxdb": NewInfluxSink, + "influxasync": NewInfluxAsyncSink, + "http": NewHttpSink, } // Metric collector manager data structure @@ -149,8 +149,7 @@ func (sm *sinkManager) AddOutput(name string, rawConfig json.RawMessage) error { cclog.ComponentError("SinkManager", "SKIP", name, "unknown sink:", sinkConfig.Type) return err } - s := AvailableSinks[sinkConfig.Type] - err = s.Init(rawConfig) + s, err := AvailableSinks[sinkConfig.Type](name, rawConfig) if err != nil { cclog.ComponentError("SinkManager", "SKIP", s.Name(), "initialization failed:", err.Error()) return err diff --git a/sinks/stdoutSink.go b/sinks/stdoutSink.go index 5d0761a..d6c2e1b 100644 --- a/sinks/stdoutSink.go +++ b/sinks/stdoutSink.go @@ -19,8 +19,8 @@ type StdoutSink struct { } } -func (s *StdoutSink) Init(config json.RawMessage) error { - s.name = "StdoutSink" +func (s *StdoutSink) Init(name string, config json.RawMessage) error { + s.name = fmt.Sprintf("StdoutSink(%s)", name) if len(config) > 0 { err := json.Unmarshal(config, &s.config) if err != nil { @@ -65,3 +65,9 @@ func (s *StdoutSink) Close() { s.output.Close() } } + +func NewStdoutSink(name string, config json.RawMessage) (Sink, error) { + s := new(StdoutSink) + s.Init(name, config) + return s, nil +} From 24e12ccc57eeb71dc9aafb43cb21dec935eb0fdc Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Tue, 22 Feb 2022 16:19:46 +0100 Subject: [PATCH 2/3] Update sink README and SampleSink --- sinks/README.md | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/sinks/README.md b/sinks/README.md index 8ff3743..65d2851 100644 --- a/sinks/README.md +++ b/sinks/README.md @@ -6,6 +6,7 @@ This folder contains the SinkManager and sink implementations for the cc-metric- - [`stdout`](./stdoutSink.md): Print all metrics to `stdout`, `stderr` or a file - [`http`](./httpSink.md): Send metrics to an HTTP server as POST requests - [`influxdb`](./influxSink.md): Send metrics to an [InfluxDB](https://www.influxdata.com/products/influxdb/) database +- [`influxasync`](./influxAsyncSink.md): Send metrics to an [InfluxDB](https://www.influxdata.com/products/influxdb/) database with non-blocking write API - [`nats`](./natsSink.md): Publish metrics to the [NATS](https://nats.io/) network overlay system - [`ganglia`](./gangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) using the `gmetric` CLI tool - [`libganglia`](./libgangliaSink.md): Publish metrics in the [Ganglia Monitoring System](http://ganglia.info/) directly using `libganglia.so` @@ -34,11 +35,12 @@ The configuration file for the sinks is a list of configurations. The `type` fie # Contributing own sinks -A sink contains four functions and is derived from the type `sink`: -* `Init(config json.RawMessage) error` +A sink contains five functions and is derived from the type `sink`: +* `Init(name string, config json.RawMessage) error` * `Write(point CCMetric) error` * `Flush() error` * `Close()` +* `New(name string, config json.RawMessage) (Sink, error)` (calls the `Init()` function) The data structures should be set up in `Init()` like opening a file or server connection. The `Write()` function writes/sends the data. For non-blocking sinks, the `Flush()` method tells the sink to drain its internal buffers. The `Close()` function should tear down anything created in `Init()`. @@ -65,8 +67,8 @@ type SampleSink struct { } // Initialize the sink by giving it a name and reading in the config JSON -func (s *SampleSink) Init(config json.RawMessage) error { - s.name = "SampleSink" // Always specify a name here +func (s *SampleSink) Init(name string, config json.RawMessage) error { + s.name = fmt.Sprintf("SampleSink(%s)", name) // Always specify a name here // Read in the config JSON if len(config) > 0 { err := json.Unmarshal(config, &s.config) @@ -91,4 +93,13 @@ func (s *SampleSink) Flush() error { // Close sink: close network connection, close files, close libraries, ... func (s *SampleSink) Close() {} + + +// New function to create a new instance of the sink +func NewSampleSink(name string, config json.RawMessage) (Sink, error) { + s := new(SampleSink) + err := s.Init(name, config) + return s, err +} + ``` \ No newline at end of file From 73981527d3392408a3be52194e8dadd252e9406d Mon Sep 17 00:00:00 2001 From: Holger Obermaier Date: Wed, 23 Feb 2022 14:56:29 +0100 Subject: [PATCH 3/3] Refactor: Embed Init() into New() function --- sinks/gangliaSink.go | 66 ++++++++++------------ sinks/httpSink.go | 98 ++++++++++++++++----------------- sinks/influxAsyncSink.go | 75 ++++++++++++------------- sinks/influxSink.go | 40 +++++++------- sinks/libgangliaSink.go | 115 +++++++++++++++++++-------------------- sinks/metricSink.go | 3 - sinks/natsSink.go | 48 ++++++++-------- sinks/stdoutSink.go | 54 +++++++++--------- 8 files changed, 234 insertions(+), 265 deletions(-) diff --git a/sinks/gangliaSink.go b/sinks/gangliaSink.go index 0c9459b..8431011 100644 --- a/sinks/gangliaSink.go +++ b/sinks/gangliaSink.go @@ -33,41 +33,6 @@ type GangliaSink struct { config GangliaSinkConfig } -func (s *GangliaSink) Init(name string, config json.RawMessage) error { - var err error = nil - s.name = fmt.Sprintf("GangliaSink(%s)", name) - s.config.AddTagsAsDesc = false - s.config.AddGangliaGroup = false - if len(config) > 0 { - err := json.Unmarshal(config, &s.config) - if err != nil { - cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error()) - return err - } - } - s.gmetric_path = "" - s.gmetric_config = "" - if len(s.config.GmetricPath) > 0 { - p, err := exec.LookPath(s.config.GmetricPath) - if err == nil { - s.gmetric_path = p - } - } - if len(s.gmetric_path) == 0 { - p, err := exec.LookPath(string(GMETRIC_EXEC)) - if err == nil { - s.gmetric_path = p - } - } - if len(s.gmetric_path) == 0 { - err = errors.New("cannot find executable 'gmetric'") - } - if len(s.config.GmetricConfig) > 0 { - s.gmetric_config = s.config.GmetricConfig - } - return err -} - func (s *GangliaSink) Write(point lp.CCMetric) error { var err error = nil var tagsstr []string @@ -170,6 +135,35 @@ func (s *GangliaSink) Close() { func NewGangliaSink(name string, config json.RawMessage) (Sink, error) { s := new(GangliaSink) - s.Init(name, config) + s.name = fmt.Sprintf("GangliaSink(%s)", name) + s.config.AddTagsAsDesc = false + s.config.AddGangliaGroup = false + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error()) + return nil, err + } + } + s.gmetric_path = "" + s.gmetric_config = "" + if len(s.config.GmetricPath) > 0 { + p, err := exec.LookPath(s.config.GmetricPath) + if err == nil { + s.gmetric_path = p + } + } + if len(s.gmetric_path) == 0 { + p, err := exec.LookPath(string(GMETRIC_EXEC)) + if err == nil { + s.gmetric_path = p + } + } + if len(s.gmetric_path) == 0 { + return nil, errors.New("cannot find executable 'gmetric'") + } + if len(s.config.GmetricConfig) > 0 { + s.gmetric_config = s.config.GmetricConfig + } return s, nil } diff --git a/sinks/httpSink.go b/sinks/httpSink.go index 41a5919..c2dd2ea 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -38,57 +38,6 @@ type HttpSink struct { flushDelay time.Duration } -func (s *HttpSink) Init(name string, config json.RawMessage) error { - // Set default values - s.name = fmt.Sprintf("HttpSink(%s)", name) - s.config.MaxIdleConns = 10 - s.config.IdleConnTimeout = "5s" - s.config.Timeout = "5s" - s.config.FlushDelay = "1s" - - // Read config - if len(config) > 0 { - err := json.Unmarshal(config, &s.config) - if err != nil { - return err - } - } - if len(s.config.URL) == 0 { - return errors.New("`url` config option is required for HTTP sink") - } - if s.config.MaxIdleConns > 0 { - s.maxIdleConns = s.config.MaxIdleConns - } - if len(s.config.IdleConnTimeout) > 0 { - t, err := time.ParseDuration(s.config.IdleConnTimeout) - if err == nil { - s.idleConnTimeout = t - } - } - if len(s.config.Timeout) > 0 { - t, err := time.ParseDuration(s.config.Timeout) - if err == nil { - s.timeout = t - } - } - if len(s.config.FlushDelay) > 0 { - t, err := time.ParseDuration(s.config.FlushDelay) - if err == nil { - s.flushDelay = t - } - } - tr := &http.Transport{ - MaxIdleConns: s.maxIdleConns, - IdleConnTimeout: s.idleConnTimeout, - } - s.client = &http.Client{Transport: tr, Timeout: s.timeout} - s.buffer = &bytes.Buffer{} - s.encoder = influx.NewEncoder(s.buffer) - s.encoder.SetPrecision(time.Second) - - return nil -} - func (s *HttpSink) Write(m lp.CCMetric) error { if s.buffer.Len() == 0 && s.flushDelay != 0 { // This is the first write since the last flush, start the flushTimer! @@ -172,6 +121,51 @@ func (s *HttpSink) Close() { func NewHttpSink(name string, config json.RawMessage) (Sink, error) { s := new(HttpSink) - s.Init(name, config) + // Set default values + s.name = fmt.Sprintf("HttpSink(%s)", name) + s.config.MaxIdleConns = 10 + s.config.IdleConnTimeout = "5s" + s.config.Timeout = "5s" + s.config.FlushDelay = "1s" + + // Read config + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + return nil, err + } + } + if len(s.config.URL) == 0 { + return nil, errors.New("`url` config option is required for HTTP sink") + } + if s.config.MaxIdleConns > 0 { + s.maxIdleConns = s.config.MaxIdleConns + } + if len(s.config.IdleConnTimeout) > 0 { + t, err := time.ParseDuration(s.config.IdleConnTimeout) + if err == nil { + s.idleConnTimeout = t + } + } + if len(s.config.Timeout) > 0 { + t, err := time.ParseDuration(s.config.Timeout) + if err == nil { + s.timeout = t + } + } + if len(s.config.FlushDelay) > 0 { + t, err := time.ParseDuration(s.config.FlushDelay) + if err == nil { + s.flushDelay = t + } + } + tr := &http.Transport{ + MaxIdleConns: s.maxIdleConns, + IdleConnTimeout: s.idleConnTimeout, + } + s.client = &http.Client{Transport: tr, Timeout: s.timeout} + s.buffer = &bytes.Buffer{} + s.encoder = influx.NewEncoder(s.buffer) + s.encoder.SetPrecision(time.Second) return s, nil } diff --git a/sinks/influxAsyncSink.go b/sinks/influxAsyncSink.go index e2f8995..81b7f78 100644 --- a/sinks/influxAsyncSink.go +++ b/sinks/influxAsyncSink.go @@ -30,11 +30,10 @@ type InfluxAsyncSinkConfig struct { type InfluxAsyncSink struct { sink - client influxdb2.Client - writeApi influxdb2Api.WriteAPI - retPolicy string - errors <-chan error - config InfluxAsyncSinkConfig + client influxdb2.Client + writeApi influxdb2Api.WriteAPI + errors <-chan error + config InfluxAsyncSinkConfig } func (s *InfluxAsyncSink) connect() error { @@ -68,39 +67,6 @@ func (s *InfluxAsyncSink) connect() error { return nil } -func (s *InfluxAsyncSink) Init(name string, config json.RawMessage) error { - s.name = fmt.Sprintf("InfluxSink(%s)", name) - - // Set default for maximum number of points sent to server in single request. - s.config.BatchSize = 100 - - if len(config) > 0 { - err := json.Unmarshal(config, &s.config) - if err != nil { - return err - } - } - if len(s.config.Host) == 0 || - len(s.config.Port) == 0 || - len(s.config.Database) == 0 || - len(s.config.Organization) == 0 || - len(s.config.Password) == 0 { - return errors.New("not all configuration variables set required by InfluxAsyncSink") - } - - // Connect to InfluxDB server - err := s.connect() - - // Start background: Read from error channel - s.errors = s.writeApi.Errors() - go func() { - for err := range s.errors { - cclog.ComponentError(s.name, err.Error()) - } - }() - return err -} - func (s *InfluxAsyncSink) Write(m lp.CCMetric) error { s.writeApi.WritePoint( m.ToPoint(s.config.MetaAsTags), @@ -121,6 +87,37 @@ func (s *InfluxAsyncSink) Close() { func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { s := new(InfluxAsyncSink) - s.Init(name, config) + s.name = fmt.Sprintf("InfluxSink(%s)", name) + + // Set default for maximum number of points sent to server in single request. + s.config.BatchSize = 100 + + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + return nil, err + } + } + if len(s.config.Host) == 0 || + len(s.config.Port) == 0 || + len(s.config.Database) == 0 || + len(s.config.Organization) == 0 || + len(s.config.Password) == 0 { + return nil, errors.New("not all configuration variables set required by InfluxAsyncSink") + } + + // Connect to InfluxDB server + if err := s.connect(); err != nil { + return nil, fmt.Errorf("Unable to connect: %v", err) + } + + // Start background: Read from error channel + s.errors = s.writeApi.Errors() + go func() { + for err := range s.errors { + cclog.ComponentError(s.name, err.Error()) + } + }() + return s, nil } diff --git a/sinks/influxSink.go b/sinks/influxSink.go index f235054..d156585 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -57,26 +57,6 @@ func (s *InfluxSink) connect() error { return nil } -func (s *InfluxSink) Init(name string, config json.RawMessage) error { - s.name = fmt.Sprintf("InfluxSink(%s)", name) - if len(config) > 0 { - err := json.Unmarshal(config, &s.config) - if err != nil { - return err - } - } - if len(s.config.Host) == 0 || - len(s.config.Port) == 0 || - len(s.config.Database) == 0 || - len(s.config.Organization) == 0 || - len(s.config.Password) == 0 { - return errors.New("not all configuration variables set required by InfluxSink") - } - - // Connect to InfluxDB server - return s.connect() -} - func (s *InfluxSink) Write(m lp.CCMetric) error { err := s.writeApi.WritePoint( @@ -97,6 +77,24 @@ func (s *InfluxSink) Close() { func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { s := new(InfluxSink) - s.Init(name, config) + s.name = fmt.Sprintf("InfluxSink(%s)", name) + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + return nil, err + } + } + if len(s.config.Host) == 0 || + len(s.config.Port) == 0 || + len(s.config.Database) == 0 || + len(s.config.Organization) == 0 || + len(s.config.Password) == 0 { + return nil, errors.New("not all configuration variables set required by InfluxSink") + } + + // Connect to InfluxDB server + if err := s.connect(); err != nil { + return nil, fmt.Errorf("Unable to connect: %v", err) + } return s, nil } diff --git a/sinks/libgangliaSink.go b/sinks/libgangliaSink.go index 9a45df4..5fd1eb8 100644 --- a/sinks/libgangliaSink.go +++ b/sinks/libgangliaSink.go @@ -109,65 +109,6 @@ type LibgangliaSink struct { cstrCache map[string]*C.char } -func (s *LibgangliaSink) Init(name string, config json.RawMessage) error { - var err error = nil - s.name = fmt.Sprintf("LibgangliaSink(%s)", name) - //s.config.AddTagsAsDesc = false - s.config.AddGangliaGroup = false - s.config.AddTypeToName = false - s.config.AddUnits = true - s.config.GmondConfig = string(GMOND_CONFIG_FILE) - s.config.GangliaLib = string(GANGLIA_LIB_NAME) - if len(config) > 0 { - err = json.Unmarshal(config, &s.config) - if err != nil { - cclog.ComponentError(s.name, "Error reading config:", err.Error()) - return err - } - } - lib := dl.New(s.config.GangliaLib, GANGLIA_LIB_DL_FLAGS) - if lib == nil { - return fmt.Errorf("error instantiating DynamicLibrary for %s", s.config.GangliaLib) - } - err = lib.Open() - if err != nil { - return fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err) - } - - // Set up cache for the C strings - s.cstrCache = make(map[string]*C.char) - // s.cstrCache["globals"] = C.CString("globals") - - // s.cstrCache["override_hostname"] = C.CString("override_hostname") - // s.cstrCache["override_ip"] = C.CString("override_ip") - - // Add some constant strings - s.cstrCache["GROUP"] = C.CString("GROUP") - s.cstrCache["CLUSTER"] = C.CString("CLUSTER") - s.cstrCache[""] = C.CString("") - - // Add cluster name for lookup in Write() - if len(s.config.ClusterName) > 0 { - s.cstrCache[s.config.ClusterName] = C.CString(s.config.ClusterName) - } - // Add supported types for later lookup in Write() - s.cstrCache["double"] = C.CString("double") - s.cstrCache["int32"] = C.CString("int32") - s.cstrCache["string"] = C.CString("string") - - // Create Ganglia pool - s.global_context = C.Ganglia_pool_create(nil) - // Load Ganglia configuration - s.cstrCache[s.config.GmondConfig] = C.CString(s.config.GmondConfig) - s.gmond_config = C.Ganglia_gmond_config_create(s.cstrCache[s.config.GmondConfig], 0) - //globals := C.cfg_getsec(gmond_config, s.cstrCache["globals"]) - //override_hostname := C.cfg_getstr(globals, s.cstrCache["override_hostname"]) - //override_ip := C.cfg_getstr(globals, s.cstrCache["override_ip"]) - - s.send_channels = C.Ganglia_udp_send_channels_create(s.global_context, s.gmond_config) - return nil -} - func (s *LibgangliaSink) Write(point lp.CCMetric) error { var err error = nil var c_name *C.char @@ -319,6 +260,60 @@ func (s *LibgangliaSink) Close() { func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error) { s := new(LibgangliaSink) - s.Init(name, config) + var err error = nil + s.name = fmt.Sprintf("LibgangliaSink(%s)", name) + //s.config.AddTagsAsDesc = false + s.config.AddGangliaGroup = false + s.config.AddTypeToName = false + s.config.AddUnits = true + s.config.GmondConfig = string(GMOND_CONFIG_FILE) + s.config.GangliaLib = string(GANGLIA_LIB_NAME) + if len(config) > 0 { + err = json.Unmarshal(config, &s.config) + if err != nil { + cclog.ComponentError(s.name, "Error reading config:", err.Error()) + return nil, err + } + } + lib := dl.New(s.config.GangliaLib, GANGLIA_LIB_DL_FLAGS) + if lib == nil { + return nil, fmt.Errorf("error instantiating DynamicLibrary for %s", s.config.GangliaLib) + } + err = lib.Open() + if err != nil { + return nil, fmt.Errorf("error opening %s: %v", s.config.GangliaLib, err) + } + + // Set up cache for the C strings + s.cstrCache = make(map[string]*C.char) + // s.cstrCache["globals"] = C.CString("globals") + + // s.cstrCache["override_hostname"] = C.CString("override_hostname") + // s.cstrCache["override_ip"] = C.CString("override_ip") + + // Add some constant strings + s.cstrCache["GROUP"] = C.CString("GROUP") + s.cstrCache["CLUSTER"] = C.CString("CLUSTER") + s.cstrCache[""] = C.CString("") + + // Add cluster name for lookup in Write() + if len(s.config.ClusterName) > 0 { + s.cstrCache[s.config.ClusterName] = C.CString(s.config.ClusterName) + } + // Add supported types for later lookup in Write() + s.cstrCache["double"] = C.CString("double") + s.cstrCache["int32"] = C.CString("int32") + s.cstrCache["string"] = C.CString("string") + + // Create Ganglia pool + s.global_context = C.Ganglia_pool_create(nil) + // Load Ganglia configuration + s.cstrCache[s.config.GmondConfig] = C.CString(s.config.GmondConfig) + s.gmond_config = C.Ganglia_gmond_config_create(s.cstrCache[s.config.GmondConfig], 0) + //globals := C.cfg_getsec(gmond_config, s.cstrCache["globals"]) + //override_hostname := C.cfg_getstr(globals, s.cstrCache["override_hostname"]) + //override_ip := C.cfg_getstr(globals, s.cstrCache["override_ip"]) + + s.send_channels = C.Ganglia_udp_send_channels_create(s.global_context, s.gmond_config) return s, nil } diff --git a/sinks/metricSink.go b/sinks/metricSink.go index 4583a2c..8fe02d7 100644 --- a/sinks/metricSink.go +++ b/sinks/metricSink.go @@ -1,8 +1,6 @@ package sinks import ( - "encoding/json" - lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" ) @@ -17,7 +15,6 @@ type sink struct { } type Sink interface { - Init(name string, config json.RawMessage) error Write(point lp.CCMetric) error Flush() error Close() diff --git a/sinks/natsSink.go b/sinks/natsSink.go index 6087da0..0d7987e 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -53,30 +53,6 @@ func (s *NatsSink) connect() error { return nil } -func (s *NatsSink) Init(name string, config json.RawMessage) error { - s.name = fmt.Sprintf("NatsSink(%s)", name) - if len(config) > 0 { - err := json.Unmarshal(config, &s.config) - if err != nil { - cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error()) - return err - } - } - if len(s.config.Host) == 0 || - len(s.config.Port) == 0 || - len(s.config.Database) == 0 { - return errors.New("not all configuration variables set required by NatsSink") - } - // Setup Influx line protocol - s.buffer = &bytes.Buffer{} - s.buffer.Grow(1025) - s.encoder = influx.NewEncoder(s.buffer) - s.encoder.SetPrecision(time.Second) - s.encoder.SetMaxLineBytes(1024) - // Setup infos for connection - return s.connect() -} - func (s *NatsSink) Write(m lp.CCMetric) error { if s.client != nil { _, err := s.encoder.Encode(m.ToPoint(s.config.MetaAsTags)) @@ -108,6 +84,28 @@ func (s *NatsSink) Close() { func NewNatsSink(name string, config json.RawMessage) (Sink, error) { s := new(NatsSink) - s.Init(name, config) + s.name = fmt.Sprintf("NatsSink(%s)", name) + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + cclog.ComponentError(s.name, "Error reading config for", s.name, ":", err.Error()) + return nil, err + } + } + if len(s.config.Host) == 0 || + len(s.config.Port) == 0 || + len(s.config.Database) == 0 { + return nil, errors.New("not all configuration variables set required by NatsSink") + } + // Setup Influx line protocol + s.buffer = &bytes.Buffer{} + s.buffer.Grow(1025) + s.encoder = influx.NewEncoder(s.buffer) + s.encoder.SetPrecision(time.Second) + s.encoder.SetMaxLineBytes(1024) + // Setup infos for connection + if err := s.connect(); err != nil { + return nil, fmt.Errorf("Unable to connect: %v", err) + } return s, nil } diff --git a/sinks/stdoutSink.go b/sinks/stdoutSink.go index d6c2e1b..acf2621 100644 --- a/sinks/stdoutSink.go +++ b/sinks/stdoutSink.go @@ -19,34 +19,6 @@ type StdoutSink struct { } } -func (s *StdoutSink) Init(name string, config json.RawMessage) error { - s.name = fmt.Sprintf("StdoutSink(%s)", name) - if len(config) > 0 { - err := json.Unmarshal(config, &s.config) - if err != nil { - return err - } - } - - s.output = os.Stdout - if len(s.config.Output) > 0 { - switch strings.ToLower(s.config.Output) { - case "stdout": - s.output = os.Stdout - case "stderr": - s.output = os.Stderr - default: - f, err := os.OpenFile(s.config.Output, os.O_CREATE|os.O_WRONLY, os.FileMode(0600)) - if err != nil { - return err - } - s.output = f - } - } - s.meta_as_tags = s.config.MetaAsTags - return nil -} - func (s *StdoutSink) Write(m lp.CCMetric) error { fmt.Fprint( s.output, @@ -68,6 +40,30 @@ func (s *StdoutSink) Close() { func NewStdoutSink(name string, config json.RawMessage) (Sink, error) { s := new(StdoutSink) - s.Init(name, config) + s.name = fmt.Sprintf("StdoutSink(%s)", name) + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + return nil, err + } + } + + s.output = os.Stdout + if len(s.config.Output) > 0 { + switch strings.ToLower(s.config.Output) { + case "stdout": + s.output = os.Stdout + case "stderr": + s.output = os.Stderr + default: + f, err := os.OpenFile(s.config.Output, os.O_CREATE|os.O_WRONLY, os.FileMode(0600)) + if err != nil { + return nil, err + } + s.output = f + } + } + s.meta_as_tags = s.config.MetaAsTags + return s, nil }