Flush if batch size is reached

This commit is contained in:
Holger Obermaier 2022-05-04 11:28:06 +02:00
parent c019f8e7ad
commit c35ac9dba8

View File

@ -49,20 +49,31 @@ type InfluxSink struct {
//influxMaxRetryDelay uint //influxMaxRetryDelay uint
} }
// connect connects to the InfluxDB server
func (s *InfluxSink) connect() error { func (s *InfluxSink) connect() error {
var auth string
// URI options:
// * http://host:port
// * https://host:port
var uri string var uri string
if s.config.SSL { if s.config.SSL {
uri = fmt.Sprintf("https://%s:%s", s.config.Host, s.config.Port) uri = fmt.Sprintf("https://%s:%s", s.config.Host, s.config.Port)
} else { } else {
uri = fmt.Sprintf("http://%s:%s", s.config.Host, s.config.Port) uri = fmt.Sprintf("http://%s:%s", s.config.Host, s.config.Port)
} }
// Authentication options:
// * token
// * username:password
var auth string
if len(s.config.User) == 0 { if len(s.config.User) == 0 {
auth = s.config.Password auth = s.config.Password
} else { } else {
auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password) auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password)
} }
cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database) cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database)
// Set influxDB client options
clientOptions := influxdb2.DefaultOptions() clientOptions := influxdb2.DefaultOptions()
// if s.influxRetryInterval != 0 { // if s.influxRetryInterval != 0 {
@ -82,6 +93,7 @@ func (s *InfluxSink) connect() error {
// clientOptions.SetMaxRetries(s.config.InfluxMaxRetries) // clientOptions.SetMaxRetries(s.config.InfluxMaxRetries)
// } // }
// Do not check InfluxDB certificate
clientOptions.SetTLSConfig( clientOptions.SetTLSConfig(
&tls.Config{ &tls.Config{
InsecureSkipVerify: true, InsecureSkipVerify: true,
@ -90,8 +102,11 @@ func (s *InfluxSink) connect() error {
clientOptions.SetPrecision(time.Second) clientOptions.SetPrecision(time.Second)
// Create new writeAPI
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions) s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database) s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database)
// Check InfluxDB server accessibility
ok, err := s.client.Ping(context.Background()) ok, err := s.client.Ping(context.Background())
if err != nil { if err != nil {
return err return err
@ -103,24 +118,22 @@ func (s *InfluxSink) connect() error {
} }
func (s *InfluxSink) Write(m lp.CCMetric) error { func (s *InfluxSink) Write(m lp.CCMetric) error {
// err :=
// s.writeApi.WritePoint(
// context.Background(),
// m.ToPoint(s.meta_as_tags),
// )
if len(s.batch) == 0 && s.flushDelay != 0 { if len(s.batch) == 0 && s.flushDelay != 0 {
// This is the first write since the last flush, start the flushTimer! // This is the first write since the last flush, start the flushTimer!
if s.flushTimer != nil && s.flushTimer.Stop() { if s.flushTimer != nil && s.flushTimer.Stop() {
cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?") cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?")
} }
// Run a batched flush for all lines that have arrived in the last second // Run a batched flush for all lines that have arrived in the last flush delay interval
s.flushTimer = time.AfterFunc(s.flushDelay, func() { s.flushTimer = time.AfterFunc(s.flushDelay, func() {
if err := s.Flush(); err != nil { if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "flush failed:", err.Error()) cclog.ComponentError(s.name, "flush failed:", err.Error())
} }
}) })
} }
// Append metric to batch slice
p := m.ToPoint(s.meta_as_tags) p := m.ToPoint(s.meta_as_tags)
s.lock.Lock() s.lock.Lock()
s.batch = append(s.batch, p) s.batch = append(s.batch, p)
@ -131,21 +144,39 @@ func (s *InfluxSink) Write(m lp.CCMetric) error {
return s.Flush() return s.Flush()
} }
// Flush if batch size is reached
if len(s.batch) == s.config.BatchSize {
return s.Flush()
}
return nil return nil
} }
// Flush sends all metrics buffered in batch slice to InfluxDB server
func (s *InfluxSink) Flush() error { func (s *InfluxSink) Flush() error {
// Lock access to batch slice
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
// Nothing to do, batch slice is empty
if len(s.batch) == 0 { if len(s.batch) == 0 {
return nil return nil
} }
// Send metrics from batch slice
err := s.writeApi.WritePoint(context.Background(), s.batch...) err := s.writeApi.WritePoint(context.Background(), s.batch...)
if err != nil { if err != nil {
cclog.ComponentError(s.name, "flush failed:", err.Error()) cclog.ComponentError(s.name, "flush failed:", err.Error())
return err return err
} }
// Clear batch slice
for i := range s.batch {
s.batch[i] = nil
}
s.batch = s.batch[:0] s.batch = s.batch[:0]
return nil return nil
} }
@ -156,11 +187,16 @@ func (s *InfluxSink) Close() {
s.client.Close() s.client.Close()
} }
// NewInfluxSink create a new InfluxDB sink
func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
s := new(InfluxSink) s := new(InfluxSink)
s.name = fmt.Sprintf("InfluxSink(%s)", name) s.name = fmt.Sprintf("InfluxSink(%s)", name)
// Set config default values
s.config.BatchSize = 100 s.config.BatchSize = 100
s.config.FlushDelay = "1s" s.config.FlushDelay = "1s"
// Read config
if len(config) > 0 { if len(config) > 0 {
err := json.Unmarshal(config, &s.config) err := json.Unmarshal(config, &s.config)
if err != nil { if err != nil {
@ -174,13 +210,22 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
// s.config.InfluxMaxRetries = 0 // s.config.InfluxMaxRetries = 0
// s.config.InfluxExponentialBase = 0 // s.config.InfluxExponentialBase = 0
if len(s.config.Host) == 0 || if len(s.config.Host) == 0 {
len(s.config.Port) == 0 || return nil, errors.New("Missing host configuration required by InfluxSink")
len(s.config.Database) == 0 ||
len(s.config.Organization) == 0 ||
len(s.config.Password) == 0 {
return nil, errors.New("not all configuration variables set required by InfluxSink")
} }
if len(s.config.Port) == 0 {
return nil, errors.New("Missing port configuration required by InfluxSink")
}
if len(s.config.Database) == 0 {
return nil, errors.New("Missing database configuration required by InfluxSink")
}
if len(s.config.Organization) == 0 {
return nil, errors.New("Missing organization configuration required by InfluxSink")
}
if len(s.config.Password) == 0 {
return nil, errors.New("Missing password configuration required by InfluxSink")
}
// Create lookup map to use meta infos as tags in the output metric // Create lookup map to use meta infos as tags in the output metric
s.meta_as_tags = make(map[string]bool) s.meta_as_tags = make(map[string]bool)
for _, k := range s.config.MetaAsTags { for _, k := range s.config.MetaAsTags {
@ -199,12 +244,15 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
// s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval) // s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval)
// s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime) // s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime)
// Configure flush delay duration
if len(s.config.FlushDelay) > 0 { if len(s.config.FlushDelay) > 0 {
t, err := time.ParseDuration(s.config.FlushDelay) t, err := time.ParseDuration(s.config.FlushDelay)
if err == nil { if err == nil {
s.flushDelay = t s.flushDelay = t
} }
} }
// allocate batch slice
s.batch = make([]*write.Point, 0, s.config.BatchSize) s.batch = make([]*write.Point, 0, s.config.BatchSize)
// Connect to InfluxDB server // Connect to InfluxDB server