mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-01-13 23:59:13 +01:00
Add drop rate, when send buffer is full
This commit is contained in:
parent
a3ac8f2ead
commit
b7dcbaebcf
@ -29,17 +29,26 @@ type InfluxSink struct {
|
|||||||
Password string `json:"password,omitempty"`
|
Password string `json:"password,omitempty"`
|
||||||
Organization string `json:"organization,omitempty"`
|
Organization string `json:"organization,omitempty"`
|
||||||
SSL bool `json:"ssl,omitempty"`
|
SSL bool `json:"ssl,omitempty"`
|
||||||
// Maximum number of points sent to server in single request. Default 100
|
// Maximum number of points sent to server in single request.
|
||||||
|
// Default: 1000
|
||||||
BatchSize int `json:"batch_size,omitempty"`
|
BatchSize int `json:"batch_size,omitempty"`
|
||||||
// Interval, in which is buffer flushed if it has not been already written (by reaching batch size). Default 1s
|
// Time interval for delayed sending of metrics.
|
||||||
|
// If the buffers are already filled before the end of this interval,
|
||||||
|
// the metrics are sent without further delay.
|
||||||
|
// Default: 1s
|
||||||
FlushInterval string `json:"flush_delay,omitempty"`
|
FlushInterval string `json:"flush_delay,omitempty"`
|
||||||
|
// Time interval after which sending of the metrics is retried after a failed sending.
|
||||||
|
// Default: 5s
|
||||||
RetryInterval string `json:"retry_delay,omitempty"`
|
RetryInterval string `json:"retry_delay,omitempty"`
|
||||||
|
// Number of metrics that are dropped when buffer is full
|
||||||
|
// Default: 100
|
||||||
|
DropRate int `json:"drop_rate,omitempty"`
|
||||||
}
|
}
|
||||||
batch []*write.Point
|
batch []*write.Point
|
||||||
flushTimer *time.Timer
|
flushTimer *time.Timer
|
||||||
flushDelay time.Duration
|
flushDelay time.Duration
|
||||||
retryDelay time.Duration
|
retryDelay time.Duration
|
||||||
lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer
|
batchMutex sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
// connect connects to the InfluxDB server
|
// connect connects to the InfluxDB server
|
||||||
@ -64,7 +73,10 @@ func (s *InfluxSink) connect() error {
|
|||||||
} else {
|
} else {
|
||||||
auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password)
|
auth = fmt.Sprintf("%s:%s", s.config.User, s.config.Password)
|
||||||
}
|
}
|
||||||
cclog.ComponentDebug(s.name, "Using URI", uri, "Org", s.config.Organization, "Bucket", s.config.Database)
|
cclog.ComponentDebug(s.name,
|
||||||
|
"Using URI='"+uri+"'",
|
||||||
|
"Org='"+s.config.Organization+"'",
|
||||||
|
"Bucket='"+s.config.Database+"'")
|
||||||
|
|
||||||
// Set influxDB client options
|
// Set influxDB client options
|
||||||
clientOptions := influxdb2.DefaultOptions()
|
clientOptions := influxdb2.DefaultOptions()
|
||||||
@ -95,7 +107,7 @@ func (s *InfluxSink) connect() error {
|
|||||||
|
|
||||||
func (s *InfluxSink) Write(m lp.CCMetric) error {
|
func (s *InfluxSink) Write(m lp.CCMetric) error {
|
||||||
// Lock access to batch slice
|
// Lock access to batch slice
|
||||||
s.lock.Lock()
|
s.batchMutex.Lock()
|
||||||
|
|
||||||
if len(s.batch) == 0 && s.flushDelay != 0 {
|
if len(s.batch) == 0 && s.flushDelay != 0 {
|
||||||
// This is the first write since the last flush, start the flushTimer!
|
// This is the first write since the last flush, start the flushTimer!
|
||||||
@ -108,21 +120,24 @@ func (s *InfluxSink) Write(m lp.CCMetric) error {
|
|||||||
s.flushDelay,
|
s.flushDelay,
|
||||||
func() {
|
func() {
|
||||||
if err := s.Flush(); err != nil {
|
if err := s.Flush(); err != nil {
|
||||||
cclog.ComponentError(s.name, "flush failed:", err.Error())
|
cclog.ComponentError(s.name, "Flush timer: flush failed:", err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// batch slice full, dropping oldest metric
|
// batch slice full, dropping oldest metric(s)
|
||||||
// e.g. when previous flushes failed and batch slice was not cleared
|
// e.g. when previous flushes failed and batch slice was not cleared
|
||||||
if len(s.batch) == s.config.BatchSize {
|
if len(s.batch) == s.config.BatchSize {
|
||||||
newSize := len(s.batch) - 1
|
newSize := s.config.BatchSize - s.config.DropRate
|
||||||
|
|
||||||
for i := 0; i < newSize; i++ {
|
for i := 0; i < newSize; i++ {
|
||||||
s.batch[i] = s.batch[i+1]
|
s.batch[i] = s.batch[i+s.config.DropRate]
|
||||||
|
}
|
||||||
|
for i := newSize; i < s.config.BatchSize; i++ {
|
||||||
|
s.batch[i] = nil
|
||||||
}
|
}
|
||||||
s.batch[newSize] = nil
|
|
||||||
s.batch = s.batch[:newSize]
|
s.batch = s.batch[:newSize]
|
||||||
cclog.ComponentError(s.name, "Batch slice full, dropping oldest metric")
|
cclog.ComponentError(s.name, "Batch slice full, dropping ", s.config.DropRate, "oldest metric(s)")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Append metric to batch slice
|
// Append metric to batch slice
|
||||||
@ -135,21 +150,22 @@ func (s *InfluxSink) Write(m lp.CCMetric) error {
|
|||||||
if s.flushDelay == 0 ||
|
if s.flushDelay == 0 ||
|
||||||
len(s.batch) == s.config.BatchSize {
|
len(s.batch) == s.config.BatchSize {
|
||||||
// Unlock access to batch slice
|
// Unlock access to batch slice
|
||||||
s.lock.Unlock()
|
s.batchMutex.Unlock()
|
||||||
return s.Flush()
|
return s.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unlock access to batch slice
|
// Unlock access to batch slice
|
||||||
s.lock.Unlock()
|
s.batchMutex.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flush sends all metrics buffered in batch slice to InfluxDB server
|
// Flush sends all metrics buffered in batch slice to InfluxDB server
|
||||||
func (s *InfluxSink) Flush() error {
|
func (s *InfluxSink) Flush() error {
|
||||||
|
cclog.ComponentDebug(s.name, "Flushing")
|
||||||
|
|
||||||
// Lock access to batch slice
|
// Lock access to batch slice
|
||||||
s.lock.Lock()
|
s.batchMutex.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.batchMutex.Unlock()
|
||||||
|
|
||||||
// Nothing to do, batch slice is empty
|
// Nothing to do, batch slice is empty
|
||||||
if len(s.batch) == 0 {
|
if len(s.batch) == 0 {
|
||||||
@ -160,16 +176,17 @@ func (s *InfluxSink) Flush() error {
|
|||||||
err := s.writeApi.WritePoint(context.Background(), s.batch...)
|
err := s.writeApi.WritePoint(context.Background(), s.batch...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
||||||
|
cclog.ComponentError(s.name, "Flush(): Flush of", len(s.batch), "metrics failed:", err)
|
||||||
|
|
||||||
// Setup timer to retry flush
|
// Setup timer to retry flush
|
||||||
time.AfterFunc(
|
time.AfterFunc(
|
||||||
s.retryDelay,
|
s.retryDelay,
|
||||||
func() {
|
func() {
|
||||||
if err := s.Flush(); err != nil {
|
if err := s.Flush(); err != nil {
|
||||||
cclog.ComponentError(s.name, "flush retry failed:", err.Error())
|
cclog.ComponentError(s.name, "Retry timer: Flush failed:", err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
cclog.ComponentError(s.name, "flush failed:", err.Error())
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -186,6 +203,9 @@ func (s *InfluxSink) Close() {
|
|||||||
cclog.ComponentDebug(s.name, "Closing InfluxDB connection")
|
cclog.ComponentDebug(s.name, "Closing InfluxDB connection")
|
||||||
s.flushTimer.Stop()
|
s.flushTimer.Stop()
|
||||||
s.Flush()
|
s.Flush()
|
||||||
|
if err := s.Flush(); err != nil {
|
||||||
|
cclog.ComponentError(s.name, "Close(): Flush failed:", err)
|
||||||
|
}
|
||||||
s.client.Close()
|
s.client.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -195,32 +215,33 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
|
|||||||
s.name = fmt.Sprintf("InfluxSink(%s)", name)
|
s.name = fmt.Sprintf("InfluxSink(%s)", name)
|
||||||
|
|
||||||
// Set config default values
|
// Set config default values
|
||||||
s.config.BatchSize = 100
|
s.config.BatchSize = 1000
|
||||||
s.config.FlushInterval = "1s"
|
s.config.FlushInterval = "1s"
|
||||||
s.config.RetryInterval = "5s"
|
s.config.RetryInterval = "5s"
|
||||||
|
s.config.DropRate = 100
|
||||||
|
|
||||||
// Read config
|
// Read config
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
err := json.Unmarshal(config, &s.config)
|
err := json.Unmarshal(config, &s.config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return s, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(s.config.Host) == 0 {
|
if len(s.config.Host) == 0 {
|
||||||
return nil, errors.New("Missing host configuration required by InfluxSink")
|
return s, errors.New("Missing host configuration required by InfluxSink")
|
||||||
}
|
}
|
||||||
if len(s.config.Port) == 0 {
|
if len(s.config.Port) == 0 {
|
||||||
return nil, errors.New("Missing port configuration required by InfluxSink")
|
return s, errors.New("Missing port configuration required by InfluxSink")
|
||||||
}
|
}
|
||||||
if len(s.config.Database) == 0 {
|
if len(s.config.Database) == 0 {
|
||||||
return nil, errors.New("Missing database configuration required by InfluxSink")
|
return s, errors.New("Missing database configuration required by InfluxSink")
|
||||||
}
|
}
|
||||||
if len(s.config.Organization) == 0 {
|
if len(s.config.Organization) == 0 {
|
||||||
return nil, errors.New("Missing organization configuration required by InfluxSink")
|
return s, errors.New("Missing organization configuration required by InfluxSink")
|
||||||
}
|
}
|
||||||
if len(s.config.Password) == 0 {
|
if len(s.config.Password) == 0 {
|
||||||
return nil, errors.New("Missing password configuration required by InfluxSink")
|
return s, errors.New("Missing password configuration required by InfluxSink")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create lookup map to use meta infos as tags in the output metric
|
// Create lookup map to use meta infos as tags in the output metric
|
||||||
@ -245,12 +266,24 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !(s.config.BatchSize > 0) {
|
||||||
|
return s, fmt.Errorf("batch_size=%d in InfluxDB config must be > 0", s.config.BatchSize)
|
||||||
|
}
|
||||||
|
if !(s.config.DropRate > 0) {
|
||||||
|
return s, fmt.Errorf("drop_rate=%d in InfluxDB config must be > 0", s.config.DropRate)
|
||||||
|
}
|
||||||
|
if !(s.config.BatchSize > s.config.DropRate) {
|
||||||
|
return s, fmt.Errorf(
|
||||||
|
"batch_size=%d must be greater then drop_rate=%d in InfluxDB config",
|
||||||
|
s.config.BatchSize, s.config.DropRate)
|
||||||
|
}
|
||||||
|
|
||||||
// allocate batch slice
|
// allocate batch slice
|
||||||
s.batch = make([]*write.Point, 0, s.config.BatchSize)
|
s.batch = make([]*write.Point, 0, s.config.BatchSize)
|
||||||
|
|
||||||
// Connect to InfluxDB server
|
// Connect to InfluxDB server
|
||||||
if err := s.connect(); err != nil {
|
if err := s.connect(); err != nil {
|
||||||
return nil, fmt.Errorf("unable to connect: %v", err)
|
return s, fmt.Errorf("unable to connect: %v", err)
|
||||||
}
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user