mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-10-31 09:05:05 +01:00 
			
		
		
		
	Ping InfluxDB server after connecting to recognize faulty connections
This commit is contained in:
		| @@ -1,6 +1,7 @@ | |||||||
| package sinks | package sinks | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"context" | ||||||
| 	"crypto/tls" | 	"crypto/tls" | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"errors" | 	"errors" | ||||||
| @@ -64,6 +65,13 @@ func (s *InfluxAsyncSink) connect() error { | |||||||
| 	) | 	) | ||||||
| 	s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions) | 	s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions) | ||||||
| 	s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database) | 	s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database) | ||||||
|  | 	ok, err := s.client.Ping(context.Background()) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	if !ok { | ||||||
|  | 		return fmt.Errorf("connection to %s not healthy", uri) | ||||||
|  | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -108,7 +116,7 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { | |||||||
|  |  | ||||||
| 	// Connect to InfluxDB server | 	// Connect to InfluxDB server | ||||||
| 	if err := s.connect(); err != nil { | 	if err := s.connect(); err != nil { | ||||||
| 		return nil, fmt.Errorf("Unable to connect: %v", err) | 		return nil, fmt.Errorf("unable to connect: %v", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Start background: Read from error channel | 	// Start background: Read from error channel | ||||||
|   | |||||||
| @@ -54,6 +54,13 @@ func (s *InfluxSink) connect() error { | |||||||
| 	) | 	) | ||||||
| 	s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions) | 	s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions) | ||||||
| 	s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database) | 	s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database) | ||||||
|  | 	ok, err := s.client.Ping(context.Background()) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	if !ok { | ||||||
|  | 		return fmt.Errorf("connection to %s not healthy", uri) | ||||||
|  | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -94,7 +101,7 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { | |||||||
|  |  | ||||||
| 	// Connect to InfluxDB server | 	// Connect to InfluxDB server | ||||||
| 	if err := s.connect(); err != nil { | 	if err := s.connect(); err != nil { | ||||||
| 		return nil, fmt.Errorf("Unable to connect: %v", err) | 		return nil, fmt.Errorf("unable to connect: %v", err) | ||||||
| 	} | 	} | ||||||
| 	return s, nil | 	return s, nil | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user