diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 1987342..e8b16d8 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -6,28 +6,32 @@ import ( "encoding/json" "errors" "fmt" + "sync" "time" cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" + "github.com/influxdata/influxdb-client-go/v2/api/write" ) type InfluxSinkConfig struct { defaultSinkConfig - Host string `json:"host,omitempty"` - Port string `json:"port,omitempty"` - Database string `json:"database,omitempty"` - User string `json:"user,omitempty"` - Password string `json:"password,omitempty"` - Organization string `json:"organization,omitempty"` - SSL bool `json:"ssl,omitempty"` - RetentionPol string `json:"retention_policy,omitempty"` - InfluxRetryInterval string `json:"retry_interval"` - InfluxExponentialBase uint `json:"retry_exponential_base"` - InfluxMaxRetries uint `json:"max_retries"` - InfluxMaxRetryTime string `json:"max_retry_time"` + Host string `json:"host,omitempty"` + Port string `json:"port,omitempty"` + Database string `json:"database,omitempty"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` + Organization string `json:"organization,omitempty"` + SSL bool `json:"ssl,omitempty"` + FlushDelay string `json:"flush_delay,omitempty"` + BatchSize int `json:"batch_size,omitempty"` + RetentionPol string `json:"retention_policy,omitempty"` + // InfluxRetryInterval string `json:"retry_interval"` + // InfluxExponentialBase uint `json:"retry_exponential_base"` + // InfluxMaxRetries uint `json:"max_retries"` + // InfluxMaxRetryTime string `json:"max_retry_time"` //InfluxMaxRetryDelay string `json:"max_retry_delay"` // It is mentioned in the docs but there is no way to set it } @@ -38,6 +42,10 @@ type InfluxSink struct { config InfluxSinkConfig influxRetryInterval uint influxMaxRetryTime uint + batch []*write.Point + flushTimer *time.Timer + flushDelay time.Duration + lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer //influxMaxRetryDelay uint } @@ -56,16 +64,31 @@ func (s *InfluxSink) connect() error { } cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database) clientOptions := influxdb2.DefaultOptions() + + // if s.influxRetryInterval != 0 { + // cclog.ComponentDebug(s.name, "MaxRetryInterval", s.influxRetryInterval) + // clientOptions.SetMaxRetryInterval(s.influxRetryInterval) + // } + // if s.influxMaxRetryTime != 0 { + // cclog.ComponentDebug(s.name, "MaxRetryTime", s.influxMaxRetryTime) + // clientOptions.SetMaxRetryTime(s.influxMaxRetryTime) + // } + // if s.config.InfluxExponentialBase != 0 { + // cclog.ComponentDebug(s.name, "Exponential Base", s.config.InfluxExponentialBase) + // clientOptions.SetExponentialBase(s.config.InfluxExponentialBase) + // } + // if s.config.InfluxMaxRetries != 0 { + // cclog.ComponentDebug(s.name, "Max Retries", s.config.InfluxMaxRetries) + // clientOptions.SetMaxRetries(s.config.InfluxMaxRetries) + // } + clientOptions.SetTLSConfig( &tls.Config{ InsecureSkipVerify: true, }, ) - clientOptions.SetMaxRetryInterval(s.influxRetryInterval) - clientOptions.SetMaxRetryTime(s.influxMaxRetryTime) - clientOptions.SetExponentialBase(s.config.InfluxExponentialBase) - clientOptions.SetMaxRetries(s.config.InfluxMaxRetries) + clientOptions.SetPrecision(time.Second) s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions) s.writeApi = s.client.WriteAPIBlocking(s.config.Organization, s.config.Database) @@ -80,38 +103,76 @@ func (s *InfluxSink) connect() error { } func (s *InfluxSink) Write(m lp.CCMetric) error { - err := - s.writeApi.WritePoint( - context.Background(), - m.ToPoint(s.meta_as_tags), - ) - return err + // err := + // s.writeApi.WritePoint( + // context.Background(), + // m.ToPoint(s.meta_as_tags), + // ) + if len(s.batch) == 0 && s.flushDelay != 0 { + // This is the first write since the last flush, start the flushTimer! + if s.flushTimer != nil && s.flushTimer.Stop() { + cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?") + } + + // Run a batched flush for all lines that have arrived in the last second + s.flushTimer = time.AfterFunc(s.flushDelay, func() { + if err := s.Flush(); err != nil { + cclog.ComponentError(s.name, "flush failed:", err.Error()) + } + }) + } + p := m.ToPoint(s.meta_as_tags) + s.lock.Lock() + s.batch = append(s.batch, p) + s.lock.Unlock() + + // Flush synchronously if "flush_delay" is zero + if s.flushDelay == 0 { + return s.Flush() + } + + return nil } func (s *InfluxSink) Flush() error { + s.lock.Lock() + defer s.lock.Unlock() + if len(s.batch) == 0 { + return nil + } + err := s.writeApi.WritePoint(context.Background(), s.batch...) + if err != nil { + cclog.ComponentError(s.name, "flush failed:", err.Error()) + return err + } + s.batch = s.batch[:0] return nil } func (s *InfluxSink) Close() { cclog.ComponentDebug(s.name, "Closing InfluxDB connection") + s.flushTimer.Stop() + s.Flush() s.client.Close() } func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { s := new(InfluxSink) s.name = fmt.Sprintf("InfluxSink(%s)", name) + s.config.BatchSize = 100 + s.config.FlushDelay = "1s" if len(config) > 0 { err := json.Unmarshal(config, &s.config) if err != nil { return nil, err } } - s.influxRetryInterval = uint(time.Duration(1) * time.Second) - s.config.InfluxRetryInterval = "1s" - s.influxMaxRetryTime = uint(7 * time.Duration(24) * time.Hour) - s.config.InfluxMaxRetryTime = "168h" - s.config.InfluxMaxRetries = 20 - s.config.InfluxExponentialBase = 2 + s.influxRetryInterval = 0 + s.influxMaxRetryTime = 0 + // s.config.InfluxRetryInterval = "" + // s.config.InfluxMaxRetryTime = "" + // s.config.InfluxMaxRetries = 0 + // s.config.InfluxExponentialBase = 0 if len(s.config.Host) == 0 || len(s.config.Port) == 0 || @@ -126,15 +187,25 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { s.meta_as_tags[k] = true } - toUint := func(duration string, def uint) uint { - t, err := time.ParseDuration(duration) + // toUint := func(duration string, def uint) uint { + // if len(duration) > 0 { + // t, err := time.ParseDuration(duration) + // if err == nil { + // return uint(t.Milliseconds()) + // } + // } + // return def + // } + // s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval) + // s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime) + + if len(s.config.FlushDelay) > 0 { + t, err := time.ParseDuration(s.config.FlushDelay) if err == nil { - return uint(t.Milliseconds()) + s.flushDelay = t } - return def } - s.influxRetryInterval = toUint(s.config.InfluxRetryInterval, s.influxRetryInterval) - s.influxMaxRetryTime = toUint(s.config.InfluxMaxRetryTime, s.influxMaxRetryTime) + s.batch = make([]*write.Point, 0, s.config.BatchSize) // Connect to InfluxDB server if err := s.connect(); err != nil { diff --git a/sinks/influxSink.md b/sinks/influxSink.md index a099895..8f9ce83 100644 --- a/sinks/influxSink.md +++ b/sinks/influxSink.md @@ -17,10 +17,8 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de "password" : "examplepw", "organization": "myorg", "ssl": true, - "retry_interval" : "1s", - "retry_exponential_base" : 2, - "max_retries": 20, - "max_retry_time" : "168h" + "flush_delay" : "1s", + "batch_size" : 100 } } ``` @@ -34,9 +32,6 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de - `password`: Password for basic authentification - `organization`: Organization in the InfluxDB - `ssl`: Use SSL connection -- `retry_interval`: Base retry interval for failed write requests, default 1s -- `retry_exponential_base`: The retry interval is exponentially increased with this base, default 2 -- `max_retries`: Maximal number of retry attempts -- `max_retry_time`: Maximal time to retry failed writes, default 168h (one week) +- `flush_delay`: Group metrics coming in to a single batch +- `batch_size`: Maximal batch size -For information about the calculation of the retry interval settings, see [offical influxdb-client-go documentation](https://github.com/influxdata/influxdb-client-go#handling-of-failed-async-writes) \ No newline at end of file