Merge branch 'develop' into main

This commit is contained in:
Thomas Roehl 2022-04-04 11:49:33 +02:00
commit 229a57b16a

View File

@ -6,12 +6,14 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"strings"
"time" "time"
cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
influxdb2ApiHttp "github.com/influxdata/influxdb-client-go/v2/api/http"
) )
type InfluxAsyncSinkConfig struct { type InfluxAsyncSinkConfig struct {
@ -33,6 +35,7 @@ type InfluxAsyncSinkConfig struct {
InfluxMaxRetries uint `json:"max_retries,omitempty"` InfluxMaxRetries uint `json:"max_retries,omitempty"`
InfluxMaxRetryTime string `json:"max_retry_time,omitempty"` InfluxMaxRetryTime string `json:"max_retry_time,omitempty"`
CustomFlushInterval string `json:"custom_flush_interval,omitempty"` CustomFlushInterval string `json:"custom_flush_interval,omitempty"`
MaxRetryAttempts uint `json:"max_retry_attempts,omitempty"`
} }
type InfluxAsyncSink struct { type InfluxAsyncSink struct {
@ -101,6 +104,11 @@ func (s *InfluxAsyncSink) connect() error {
if !ok { if !ok {
return fmt.Errorf("connection to %s not healthy", uri) return fmt.Errorf("connection to %s not healthy", uri)
} }
s.writeApi.SetWriteFailedCallback(func(batch string, err influxdb2ApiHttp.Error, retryAttempts uint) bool {
mlist := strings.Split(batch, "\n")
cclog.ComponentError(s.name, fmt.Sprintf("Failed to write batch with %d metrics %d times (max: %d): %s", len(mlist), retryAttempts, s.config.MaxRetryAttempts, err.Error()))
return retryAttempts <= s.config.MaxRetryAttempts
})
return nil return nil
} }
@ -111,7 +119,6 @@ func (s *InfluxAsyncSink) Write(m lp.CCMetric) error {
if err := s.Flush(); err != nil { if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "flush failed:", err.Error()) cclog.ComponentError(s.name, "flush failed:", err.Error())
} }
}) })
} }
s.writeApi.WritePoint( s.writeApi.WritePoint(
@ -150,6 +157,7 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
s.config.FlushInterval = 0 s.config.FlushInterval = 0
s.config.CustomFlushInterval = "" s.config.CustomFlushInterval = ""
s.customFlushInterval = time.Duration(0) s.customFlushInterval = time.Duration(0)
s.config.MaxRetryAttempts = 1
// Default retry intervals (in seconds) // Default retry intervals (in seconds)
// 1 2 // 1 2