diff --git a/sinks/influxAsyncSink.go b/sinks/influxAsyncSink.go index c2956fc..31d127d 100644 --- a/sinks/influxAsyncSink.go +++ b/sinks/influxAsyncSink.go @@ -6,12 +6,14 @@ import ( "encoding/json" "errors" "fmt" + "strings" "time" cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" + influxdb2ApiHttp "github.com/influxdata/influxdb-client-go/v2/api/http" ) type InfluxAsyncSinkConfig struct { @@ -33,6 +35,7 @@ type InfluxAsyncSinkConfig struct { InfluxMaxRetries uint `json:"max_retries,omitempty"` InfluxMaxRetryTime string `json:"max_retry_time,omitempty"` CustomFlushInterval string `json:"custom_flush_interval,omitempty"` + MaxRetryAttempts uint `json:"max_retry_attempts,omitempty"` } type InfluxAsyncSink struct { @@ -101,6 +104,11 @@ func (s *InfluxAsyncSink) connect() error { if !ok { return fmt.Errorf("connection to %s not healthy", uri) } + s.writeApi.SetWriteFailedCallback(func(batch string, err influxdb2ApiHttp.Error, retryAttempts uint) bool { + mlist := strings.Split(batch, "\n") + cclog.ComponentError(s.name, fmt.Sprintf("Failed to write batch with %d metrics %d times (max: %d): %s", len(mlist), retryAttempts, s.config.MaxRetryAttempts, err.Error())) + return retryAttempts <= s.config.MaxRetryAttempts + }) return nil } @@ -111,7 +119,6 @@ func (s *InfluxAsyncSink) Write(m lp.CCMetric) error { if err := s.Flush(); err != nil { cclog.ComponentError(s.name, "flush failed:", err.Error()) } - }) } s.writeApi.WritePoint( @@ -150,6 +157,7 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { s.config.FlushInterval = 0 s.config.CustomFlushInterval = "" s.customFlushInterval = time.Duration(0) + s.config.MaxRetryAttempts = 1 // Default retry intervals (in seconds) // 1 2