From c35ac9dba840995b32e52ecaa2b3585932f0dde4 Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Wed, 4 May 2022 11:28:06 +0200 Subject: [PATCH] Flush if batch size is reached --- sinks/influxSink.go | 74 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 61 insertions(+), 13 deletions(-) diff --git a/sinks/influxSink.go b/sinks/influxSink.go index e8b16d8..b382c38 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -49,20 +49,31 @@ type InfluxSink struct { //influxMaxRetryDelay uint } +// connect connects to the InfluxDB server func (s *InfluxSink) connect() error { - var auth string + + // URI options: + // * http://host:port + // * https://host:port var uri string if s.config.SSL { uri = fmt.Sprintf("https://%s:%s", s.config.Host, s.config.Port) } else { uri = fmt.Sprintf("http://%s:%s", s.config.Host, s.config.Port) } + + // Authentication options: + // * token + // * username:password + var auth string if len(s.config.User) == 0 { auth = s.config.Password } else { auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password) } cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database) + + // Set influxDB client options clientOptions := influxdb2.DefaultOptions() // if s.influxRetryInterval != 0 { @@ -82,6 +93,7 @@ func (s *InfluxSink) connect() error { // clientOptions.SetMaxRetries(s.config.InfluxMaxRetries) // } + // Do not check InfluxDB certificate clientOptions.SetTLSConfig( &tls.Config{ InsecureSkipVerify: true, @@ -90,8 +102,11 @@ func (s *InfluxSink) connect() error { clientOptions.SetPrecision(time.Second) + // Create new writeAPI s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions) s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database) + + // Check InfluxDB server accessibility ok, err := s.client.Ping(context.Background()) if err != nil { return err @@ -103,24 +118,22 @@ func (s *InfluxSink) connect() error { } func (s *InfluxSink) Write(m lp.CCMetric) error { - // err := - // s.writeApi.WritePoint( - // context.Background(), - // m.ToPoint(s.meta_as_tags), - // ) + if len(s.batch) == 0 && s.flushDelay != 0 { // This is the first write since the last flush, start the flushTimer! if s.flushTimer != nil && s.flushTimer.Stop() { cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?") } - // Run a batched flush for all lines that have arrived in the last second + // Run a batched flush for all lines that have arrived in the last flush delay interval s.flushTimer = time.AfterFunc(s.flushDelay, func() { if err := s.Flush(); err != nil { cclog.ComponentError(s.name, "flush failed:", err.Error()) } }) } + + // Append metric to batch slice p := m.ToPoint(s.meta_as_tags) s.lock.Lock() s.batch = append(s.batch, p) @@ -131,21 +144,39 @@ func (s *InfluxSink) Write(m lp.CCMetric) error { return s.Flush() } + // Flush if batch size is reached + if len(s.batch) == s.config.BatchSize { + return s.Flush() + } + return nil } +// Flush sends all metrics buffered in batch slice to InfluxDB server func (s *InfluxSink) Flush() error { + + // Lock access to batch slice s.lock.Lock() defer s.lock.Unlock() + + // Nothing to do, batch slice is empty if len(s.batch) == 0 { return nil } + + // Send metrics from batch slice err := s.writeApi.WritePoint(context.Background(), s.batch...) if err != nil { cclog.ComponentError(s.name, "flush failed:", err.Error()) return err } + + // Clear batch slice + for i := range s.batch { + s.batch[i] = nil + } s.batch = s.batch[:0] + return nil } @@ -156,11 +187,16 @@ func (s *InfluxSink) Close() { s.client.Close() } +// NewInfluxSink create a new InfluxDB sink func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { s := new(InfluxSink) s.name = fmt.Sprintf("InfluxSink(%s)", name) + + // Set config default values s.config.BatchSize = 100 s.config.FlushDelay = "1s" + + // Read config if len(config) > 0 { err := json.Unmarshal(config, &s.config) if err != nil { @@ -174,13 +210,22 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { // s.config.InfluxMaxRetries = 0 // s.config.InfluxExponentialBase = 0 - if len(s.config.Host) == 0 || - len(s.config.Port) == 0 || - len(s.config.Database) == 0 || - len(s.config.Organization) == 0 || - len(s.config.Password) == 0 { - return nil, errors.New("not all configuration variables set required by InfluxSink") + if len(s.config.Host) == 0 { + return nil, errors.New("Missing host configuration required by InfluxSink") } + if len(s.config.Port) == 0 { + return nil, errors.New("Missing port configuration required by InfluxSink") + } + if len(s.config.Database) == 0 { + return nil, errors.New("Missing database configuration required by InfluxSink") + } + if len(s.config.Organization) == 0 { + return nil, errors.New("Missing organization configuration required by InfluxSink") + } + if len(s.config.Password) == 0 { + return nil, errors.New("Missing password configuration required by InfluxSink") + } + // Create lookup map to use meta infos as tags in the output metric s.meta_as_tags = make(map[string]bool) for _, k := range s.config.MetaAsTags { @@ -199,12 +244,15 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { // s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval) // s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime) + // Configure flush delay duration if len(s.config.FlushDelay) > 0 { t, err := time.ParseDuration(s.config.FlushDelay) if err == nil { s.flushDelay = t } } + + // allocate batch slice s.batch = make([]*write.Point, 0, s.config.BatchSize) // Connect to InfluxDB server