Set WriteFailedCallback to get some error message

This commit is contained in:
Thomas Roehl 2022-04-04 11:48:54 +02:00
parent 69f7c19659
commit 70a9530aba

View File

@ -6,12 +6,14 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"
"time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
influxdb2ApiHttp "github.com/influxdata/influxdb-client-go/v2/api/http"
)
type InfluxAsyncSinkConfig struct {
@ -33,6 +35,7 @@ type InfluxAsyncSinkConfig struct {
InfluxMaxRetries uint `json:"max_retries,omitempty"`
InfluxMaxRetryTime string `json:"max_retry_time,omitempty"`
CustomFlushInterval string `json:"custom_flush_interval,omitempty"`
MaxRetryAttempts uint `json:"max_retry_attempts,omitempty"`
}
type InfluxAsyncSink struct {
@ -101,6 +104,11 @@ func (s *InfluxAsyncSink) connect() error {
if !ok {
return fmt.Errorf("connection to %s not healthy", uri)
}
s.writeApi.SetWriteFailedCallback(func(batch string, err influxdb2ApiHttp.Error, retryAttempts uint) bool {
mlist := strings.Split(batch, "\n")
cclog.ComponentError(s.name, fmt.Sprintf("Failed to write batch with %d metrics %d times (max: %d): %s", len(mlist), retryAttempts, s.config.MaxRetryAttempts, err.Error()))
return retryAttempts <= s.config.MaxRetryAttempts
})
return nil
}
@ -111,7 +119,6 @@ func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "flush failed:", err.Error())
}
})
}
s.writeApi.WritePoint(
@ -150,6 +157,7 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
s.config.FlushInterval = 0
s.config.CustomFlushInterval = ""
s.customFlushInterval = time.Duration(0)
s.config.MaxRetryAttempts = 1
// Default retry intervals (in seconds)
// 1 2