diff --git a/sinks/influxAsyncSink.go b/sinks/influxAsyncSink.go new file mode 100644 index 0000000..0763a4b --- /dev/null +++ b/sinks/influxAsyncSink.go @@ -0,0 +1,119 @@ +package sinks + +import ( + // "context" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + + cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" + lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" + "github.com/influxdata/influxdb-client-go/v2/api/write" +) + +type InfluxAsyncSinkConfig struct { + defaultSinkConfig + Host string `json:"host,omitempty"` + Port string `json:"port,omitempty"` + Database string `json:"database,omitempty"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` + Organization string `json:"organization,omitempty"` + SSL bool `json:"ssl,omitempty"` + RetentionPol string `json:"retention_policy,omitempty"` + BatchSize uint `json:"batch_size,omitempty"` +} + +type InfluxAsyncSink struct { + sink + client influxdb2.Client + writeApi influxdb2Api.WriteAPI + retPolicy string + errors <-chan error + config InfluxAsyncSinkConfig +} + +func (s *InfluxAsyncSink) connect() error { + var auth string + var uri string + if s.config.SSL { + uri = fmt.Sprintf("https://%s:%s", s.config.Host, s.config.Port) + } else { + uri = fmt.Sprintf("http://%s:%s", s.config.Host, s.config.Port) + } + if len(s.config.User) == 0 { + auth = s.config.Password + } else { + auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password) + } + cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database) + batch := s.config.BatchSize + if batch == 0 { + batch = 100 + } + s.client = influxdb2.NewClientWithOptions(uri, auth, + influxdb2.DefaultOptions().SetBatchSize(batch).SetTLSConfig(&tls.Config{ + InsecureSkipVerify: true, + })) + s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database) + return nil +} + +func (s *InfluxAsyncSink) Init(config json.RawMessage) error { + s.name = "InfluxSink" + s.config.BatchSize = 100 + if len(config) > 0 { + err := json.Unmarshal(config, &s.config) + if err != nil { + return err + } + } + if len(s.config.Host) == 0 || + len(s.config.Port) == 0 || + len(s.config.Database) == 0 || + len(s.config.Organization) == 0 || + len(s.config.Password) == 0 { + return errors.New("not all configuration variables set required by InfluxAsyncSink") + } + err := s.connect() + s.errors = s.writeApi.Errors() + go func() { + for err := range s.errors { + cclog.ComponentError(s.name, err.Error()) + } + }() + return err +} + +func (s *InfluxAsyncSink) Write(point lp.CCMetric) error { + var p *write.Point + if s.config.MetaAsTags { + tags := map[string]string{} + for k, v := range point.Tags() { + tags[k] = v + } + for k, v := range point.Meta() { + tags[k] = v + } + p = influxdb2.NewPoint(point.Name(), tags, point.Fields(), point.Time()) + } else { + p = influxdb2.NewPoint(point.Name(), point.Tags(), point.Fields(), point.Time()) + } + + s.writeApi.WritePoint(p) + return nil +} + +func (s *InfluxAsyncSink) Flush() error { + s.writeApi.Flush() + return nil +} + +func (s *InfluxAsyncSink) Close() { + cclog.ComponentDebug(s.name, "Closing InfluxDB connection") + s.writeApi.Flush() + s.client.Close() +} diff --git a/sinks/influxAsyncSink.md b/sinks/influxAsyncSink.md new file mode 100644 index 0000000..286c93c --- /dev/null +++ b/sinks/influxAsyncSink.md @@ -0,0 +1,34 @@ +## `influxasync` sink + +The `influxasync` sink uses the official [InfluxDB golang client](https://pkg.go.dev/github.com/influxdata/influxdb-client-go/v2) to write the metrics to an InfluxDB database in a **non-blocking** fashion. It provides only support for V2 write endpoints (InfluxDB 1.8.0 or later). + + +### Configuration structure + +```json +{ + "": { + "type": "influxasync", + "meta_as_tags" : true, + "database" : "mymetrics", + "host": "dbhost.example.com", + "port": "4222", + "user": "exampleuser", + "password" : "examplepw", + "organization": "myorg", + "ssl": true, + "batch_size": 200, + } +} +``` + +- `type`: makes the sink an `influxdb` sink +- `meta_as_tags`: print all meta information as tags in the output (optional) +- `database`: All metrics are written to this bucket +- `host`: Hostname of the InfluxDB database server +- `port`: Portnumber (as string) of the InfluxDB database server +- `user`: Username for basic authentification +- `password`: Password for basic authentification +- `organization`: Organization in the InfluxDB +- `ssl`: Use SSL connection +- `batch_size`: batch up metrics internally, default 100 \ No newline at end of file diff --git a/sinks/influxSink.md b/sinks/influxSink.md index 2624034..bd0f576 100644 --- a/sinks/influxSink.md +++ b/sinks/influxSink.md @@ -1,6 +1,6 @@ ## `influxdb` sink -The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.dev/github.com/influxdata/influxdb-client-go/v2) to write the metrics to an InfluxDB database. It provides only support for V2 write endpoints (InfluxDB 1.8.0 or later). +The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.dev/github.com/influxdata/influxdb-client-go/v2) to write the metrics to an InfluxDB database in a **blocking** fashion. It provides only support for V2 write endpoints (InfluxDB 1.8.0 or later). ### Configuration structure diff --git a/sinks/sinkManager.go b/sinks/sinkManager.go index 21c392f..09b4fc4 100644 --- a/sinks/sinkManager.go +++ b/sinks/sinkManager.go @@ -12,11 +12,12 @@ import ( // Map of all available sinks var AvailableSinks = map[string]Sink{ - "influxdb": new(InfluxSink), - "stdout": new(StdoutSink), - "nats": new(NatsSink), - "http": new(HttpSink), - "ganglia": new(GangliaSink), + "influxdb": new(InfluxSink), + "stdout": new(StdoutSink), + "nats": new(NatsSink), + "http": new(HttpSink), + "ganglia": new(GangliaSink), + "influxasync": new(InfluxAsyncSink), } // Metric collector manager data structure