mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-10-21 21:35:06 +02:00 
			
		
		
		
	Add non-blocking InfluxDB sink
This commit is contained in:
		
							
								
								
									
										113
									
								
								sinks/influxAsyncSink.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										113
									
								
								sinks/influxAsyncSink.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,113 @@ | |||||||
|  | package sinks | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	//	"context" | ||||||
|  | 	"crypto/tls" | ||||||
|  | 	"encoding/json" | ||||||
|  | 	"errors" | ||||||
|  | 	"fmt" | ||||||
|  |  | ||||||
|  | 	cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" | ||||||
|  | 	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" | ||||||
|  | 	influxdb2 "github.com/influxdata/influxdb-client-go/v2" | ||||||
|  | 	influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" | ||||||
|  | 	"github.com/influxdata/influxdb-client-go/v2/api/write" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type InfluxAsyncSinkConfig struct { | ||||||
|  | 	defaultSinkConfig | ||||||
|  | 	Host         string `json:"host,omitempty"` | ||||||
|  | 	Port         string `json:"port,omitempty"` | ||||||
|  | 	Database     string `json:"database,omitempty"` | ||||||
|  | 	User         string `json:"user,omitempty"` | ||||||
|  | 	Password     string `json:"password,omitempty"` | ||||||
|  | 	Organization string `json:"organization,omitempty"` | ||||||
|  | 	SSL          bool   `json:"ssl,omitempty"` | ||||||
|  | 	RetentionPol string `json:"retention_policy,omitempty"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type InfluxAsyncSink struct { | ||||||
|  | 	sink | ||||||
|  | 	client    influxdb2.Client | ||||||
|  | 	writeApi  influxdb2Api.WriteAPI | ||||||
|  | 	retPolicy string | ||||||
|  | 	errors    <-chan error | ||||||
|  | 	config    InfluxAsyncSinkConfig | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *InfluxAsyncSink) connect() error { | ||||||
|  | 	var auth string | ||||||
|  | 	var uri string | ||||||
|  | 	if s.config.SSL { | ||||||
|  | 		uri = fmt.Sprintf("https://%s:%s", s.config.Host, s.config.Port) | ||||||
|  | 	} else { | ||||||
|  | 		uri = fmt.Sprintf("http://%s:%s", s.config.Host, s.config.Port) | ||||||
|  | 	} | ||||||
|  | 	if len(s.config.User) == 0 { | ||||||
|  | 		auth = s.config.Password | ||||||
|  | 	} else { | ||||||
|  | 		auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password) | ||||||
|  | 	} | ||||||
|  | 	cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database) | ||||||
|  | 	s.client = influxdb2.NewClientWithOptions(uri, auth, | ||||||
|  | 		influxdb2.DefaultOptions().SetBatchSize(20).SetTLSConfig(&tls.Config{ | ||||||
|  | 			InsecureSkipVerify: true, | ||||||
|  | 		})) | ||||||
|  | 	s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database) | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *InfluxAsyncSink) Init(config json.RawMessage) error { | ||||||
|  | 	s.name = "InfluxSink" | ||||||
|  | 	if len(config) > 0 { | ||||||
|  | 		err := json.Unmarshal(config, &s.config) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	if len(s.config.Host) == 0 || | ||||||
|  | 		len(s.config.Port) == 0 || | ||||||
|  | 		len(s.config.Database) == 0 || | ||||||
|  | 		len(s.config.Organization) == 0 || | ||||||
|  | 		len(s.config.Password) == 0 { | ||||||
|  | 		return errors.New("not all configuration variables set required by InfluxAsyncSink") | ||||||
|  | 	} | ||||||
|  | 	err := s.connect() | ||||||
|  | 	s.errors = s.writeApi.Errors() | ||||||
|  | 	go func() { | ||||||
|  | 		for err := range s.errors { | ||||||
|  | 			cclog.ComponentError(s.name, err.Error()) | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *InfluxAsyncSink) Write(point lp.CCMetric) error { | ||||||
|  | 	var p *write.Point | ||||||
|  | 	if s.config.MetaAsTags { | ||||||
|  | 		tags := map[string]string{} | ||||||
|  | 		for k, v := range point.Tags() { | ||||||
|  | 			tags[k] = v | ||||||
|  | 		} | ||||||
|  | 		for k, v := range point.Meta() { | ||||||
|  | 			tags[k] = v | ||||||
|  | 		} | ||||||
|  | 		p = influxdb2.NewPoint(point.Name(), tags, point.Fields(), point.Time()) | ||||||
|  | 	} else { | ||||||
|  | 		p = influxdb2.NewPoint(point.Name(), point.Tags(), point.Fields(), point.Time()) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	s.writeApi.WritePoint(p) | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *InfluxAsyncSink) Flush() error { | ||||||
|  | 	s.writeApi.Flush() | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *InfluxAsyncSink) Close() { | ||||||
|  | 	cclog.ComponentDebug(s.name, "Closing InfluxDB connection") | ||||||
|  | 	s.writeApi.Flush() | ||||||
|  | 	s.client.Close() | ||||||
|  | } | ||||||
							
								
								
									
										32
									
								
								sinks/influxAsyncSink.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										32
									
								
								sinks/influxAsyncSink.md
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,32 @@ | |||||||
|  | ## `influxasync` sink | ||||||
|  |  | ||||||
|  | The `influxasync` sink uses the official [InfluxDB golang client](https://pkg.go.dev/github.com/influxdata/influxdb-client-go/v2) to write the metrics to an InfluxDB database in a **non-blocking** fashion. It provides only support for V2 write endpoints (InfluxDB 1.8.0 or later). | ||||||
|  |  | ||||||
|  |  | ||||||
|  | ### Configuration structure | ||||||
|  |  | ||||||
|  | ```json | ||||||
|  | { | ||||||
|  |   "<name>": { | ||||||
|  |     "type": "influxasync", | ||||||
|  |     "meta_as_tags" : true, | ||||||
|  |     "database" : "mymetrics", | ||||||
|  |     "host": "dbhost.example.com", | ||||||
|  |     "port": "4222", | ||||||
|  |     "user": "exampleuser", | ||||||
|  |     "password" : "examplepw", | ||||||
|  |     "organization": "myorg", | ||||||
|  |     "ssl": true, | ||||||
|  |   } | ||||||
|  | } | ||||||
|  | ``` | ||||||
|  |  | ||||||
|  | - `type`: makes the sink an `influxdb` sink | ||||||
|  | - `meta_as_tags`: print all meta information as tags in the output (optional) | ||||||
|  | - `database`: All metrics are written to this bucket  | ||||||
|  | - `host`: Hostname of the InfluxDB database server | ||||||
|  | - `port`: Portnumber (as string) of the InfluxDB database server | ||||||
|  | - `user`: Username for basic authentification | ||||||
|  | - `password`: Password for basic authentification | ||||||
|  | - `organization`: Organization in the InfluxDB | ||||||
|  | - `ssl`: Use SSL connection | ||||||
| @@ -1,6 +1,6 @@ | |||||||
| ## `influxdb` sink | ## `influxdb` sink | ||||||
|  |  | ||||||
| The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.dev/github.com/influxdata/influxdb-client-go/v2) to write the metrics to an InfluxDB database. It provides only support for V2 write endpoints (InfluxDB 1.8.0 or later). | The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.dev/github.com/influxdata/influxdb-client-go/v2) to write the metrics to an InfluxDB database in a **blocking** fashion. It provides only support for V2 write endpoints (InfluxDB 1.8.0 or later). | ||||||
|  |  | ||||||
|  |  | ||||||
| ### Configuration structure | ### Configuration structure | ||||||
|   | |||||||
| @@ -12,11 +12,12 @@ import ( | |||||||
|  |  | ||||||
| // Map of all available sinks | // Map of all available sinks | ||||||
| var AvailableSinks = map[string]Sink{ | var AvailableSinks = map[string]Sink{ | ||||||
| 	"influxdb": new(InfluxSink), | 	"influxdb":    new(InfluxSink), | ||||||
| 	"stdout":   new(StdoutSink), | 	"stdout":      new(StdoutSink), | ||||||
| 	"nats":     new(NatsSink), | 	"nats":        new(NatsSink), | ||||||
| 	"http":     new(HttpSink), | 	"http":        new(HttpSink), | ||||||
| 	"ganglia":  new(GangliaSink), | 	"ganglia":     new(GangliaSink), | ||||||
|  | 	"influxasync": new(InfluxAsyncSink), | ||||||
| } | } | ||||||
|  |  | ||||||
| // Metric collector manager data structure | // Metric collector manager data structure | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user