Cleanup: Remove unused code

This commit is contained in:
Holger Obermaier 2022-05-06 11:44:57 +02:00
parent e098c33179
commit ee4bd558f1
2 changed files with 41 additions and 79 deletions

View File

@ -25,7 +25,6 @@ type InfluxAsyncSinkConfig struct {
Password string `json:"password,omitempty"` Password string `json:"password,omitempty"`
Organization string `json:"organization,omitempty"` Organization string `json:"organization,omitempty"`
SSL bool `json:"ssl,omitempty"` SSL bool `json:"ssl,omitempty"`
RetentionPol string `json:"retention_policy,omitempty"`
// Maximum number of points sent to server in single request. Default 5000 // Maximum number of points sent to server in single request. Default 5000
BatchSize uint `json:"batch_size,omitempty"` BatchSize uint `json:"batch_size,omitempty"`
// Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms // Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms
@ -186,12 +185,17 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
return nil, err return nil, err
} }
} }
if len(s.config.Host) == 0 || if len(s.config.Port) == 0 {
len(s.config.Port) == 0 || return nil, errors.New("Missing port configuration required by InfluxSink")
len(s.config.Database) == 0 || }
len(s.config.Organization) == 0 || if len(s.config.Database) == 0 {
len(s.config.Password) == 0 { return nil, errors.New("Missing database configuration required by InfluxSink")
return nil, errors.New("not all configuration variables set required by InfluxAsyncSink") }
if len(s.config.Organization) == 0 {
return nil, errors.New("Missing organization configuration required by InfluxSink")
}
if len(s.config.Password) == 0 {
return nil, errors.New("Missing password configuration required by InfluxSink")
} }
// Create lookup map to use meta infos as tags in the output metric // Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool) s.meta_as_tags = make(map[string]bool)

View File

@ -16,37 +16,28 @@ import (
"github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/influxdata/influxdb-client-go/v2/api/write"
) )
type InfluxSinkConfig struct {
defaultSinkConfig
Host string `json:"host,omitempty"`
Port string `json:"port,omitempty"`
Database string `json:"database,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
Organization string `json:"organization,omitempty"`
SSL bool `json:"ssl,omitempty"`
FlushDelay string `json:"flush_delay,omitempty"`
BatchSize int `json:"batch_size,omitempty"`
RetentionPol string `json:"retention_policy,omitempty"`
// InfluxRetryInterval string `json:"retry_interval"`
// InfluxExponentialBase uint `json:"retry_exponential_base"`
// InfluxMaxRetries uint `json:"max_retries"`
// InfluxMaxRetryTime string `json:"max_retry_time"`
//InfluxMaxRetryDelay string `json:"max_retry_delay"` // It is mentioned in the docs but there is no way to set it
}
type InfluxSink struct { type InfluxSink struct {
sink sink
client influxdb2.Client client influxdb2.Client
writeApi influxdb2Api.WriteAPIBlocking writeApi influxdb2Api.WriteAPIBlocking
config InfluxSinkConfig config struct {
influxRetryInterval uint defaultSinkConfig
influxMaxRetryTime uint Host string `json:"host,omitempty"`
batch []*write.Point Port string `json:"port,omitempty"`
flushTimer *time.Timer Database string `json:"database,omitempty"`
flushDelay time.Duration User string `json:"user,omitempty"`
lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer Password string `json:"password,omitempty"`
//influxMaxRetryDelay uint Organization string `json:"organization,omitempty"`
SSL bool `json:"ssl,omitempty"`
// Maximum number of points sent to server in single request. Default 100
BatchSize int `json:"batch_size,omitempty"`
// Interval, in which is buffer flushed if it has not been already written (by reaching batch size). Default 1s
FlushInterval string `json:"flush_delay,omitempty"`
}
batch []*write.Point
flushTimer *time.Timer
flushDelay time.Duration
lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer
} }
// connect connects to the InfluxDB server // connect connects to the InfluxDB server
@ -76,23 +67,6 @@ func (s *InfluxSink) connect() error {
// Set influxDB client options // Set influxDB client options
clientOptions := influxdb2.DefaultOptions() clientOptions := influxdb2.DefaultOptions()
// if s.influxRetryInterval != 0 {
// cclog.ComponentDebug(s.name, "MaxRetryInterval", s.influxRetryInterval)
// clientOptions.SetMaxRetryInterval(s.influxRetryInterval)
// }
// if s.influxMaxRetryTime != 0 {
// cclog.ComponentDebug(s.name, "MaxRetryTime", s.influxMaxRetryTime)
// clientOptions.SetMaxRetryTime(s.influxMaxRetryTime)
// }
// if s.config.InfluxExponentialBase != 0 {
// cclog.ComponentDebug(s.name, "Exponential Base", s.config.InfluxExponentialBase)
// clientOptions.SetExponentialBase(s.config.InfluxExponentialBase)
// }
// if s.config.InfluxMaxRetries != 0 {
// cclog.ComponentDebug(s.name, "Max Retries", s.config.InfluxMaxRetries)
// clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
// }
// Do not check InfluxDB certificate // Do not check InfluxDB certificate
clientOptions.SetTLSConfig( clientOptions.SetTLSConfig(
&tls.Config{ &tls.Config{
@ -126,11 +100,13 @@ func (s *InfluxSink) Write(m lp.CCMetric) error {
} }
// Run a batched flush for all lines that have arrived in the last flush delay interval // Run a batched flush for all lines that have arrived in the last flush delay interval
s.flushTimer = time.AfterFunc(s.flushDelay, func() { s.flushTimer = time.AfterFunc(
if err := s.Flush(); err != nil { s.flushDelay,
cclog.ComponentError(s.name, "flush failed:", err.Error()) func() {
} if err := s.Flush(); err != nil {
}) cclog.ComponentError(s.name, "flush failed:", err.Error())
}
})
} }
// Append metric to batch slice // Append metric to batch slice
@ -194,7 +170,7 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
// Set config default values // Set config default values
s.config.BatchSize = 100 s.config.BatchSize = 100
s.config.FlushDelay = "1s" s.config.FlushInterval = "1s"
// Read config // Read config
if len(config) > 0 { if len(config) > 0 {
@ -203,12 +179,6 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
return nil, err return nil, err
} }
} }
s.influxRetryInterval = 0
s.influxMaxRetryTime = 0
// s.config.InfluxRetryInterval = ""
// s.config.InfluxMaxRetryTime = ""
// s.config.InfluxMaxRetries = 0
// s.config.InfluxExponentialBase = 0
if len(s.config.Host) == 0 { if len(s.config.Host) == 0 {
return nil, errors.New("Missing host configuration required by InfluxSink") return nil, errors.New("Missing host configuration required by InfluxSink")
@ -232,21 +202,9 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
s.meta_as_tags[k] = true s.meta_as_tags[k] = true
} }
// toUint := func(duration string, def uint) uint {
// if len(duration) > 0 {
// t, err := time.ParseDuration(duration)
// if err == nil {
// return uint(t.Milliseconds())
// }
// }
// return def
// }
// s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
// s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
// Configure flush delay duration // Configure flush delay duration
if len(s.config.FlushDelay) > 0 { if len(s.config.FlushInterval) > 0 {
t, err := time.ParseDuration(s.config.FlushDelay) t, err := time.ParseDuration(s.config.FlushInterval)
if err == nil { if err == nil {
s.flushDelay = t s.flushDelay = t
} }