shorter cirt. section and retries for HttpSink

This commit is contained in:
Lou Knauer 2022-06-02 12:02:20 +02:00
parent 1b6ace43d2
commit 7f97f7ec3b
3 changed files with 184 additions and 45 deletions

134
sinks/compactor.go Normal file
View File

@ -0,0 +1,134 @@
package sinks
import (
"errors"
"fmt"
"sort"
"time"
ilp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
lp "github.com/influxdata/line-protocol"
)
// A node in a tree structured by tags
type compactorEntry struct {
tagKey string // The tag key all points share in this sub-tree.
tagValue string // The tag value all points share in this sub-tree.
points map[int64][]ilp.CCMetric // The points with those keys (only), grouped by points where the time is the same.
entries map[string]*compactorEntry // Points with even more keys go here, grouped by those keys.
}
func (ce *compactorEntry) read(lines []ilp.CCMetric, tags []lp.Tag) ([]ilp.CCMetric, error) {
for t, points := range ce.points {
fields := make(map[string]interface{})
for _, point := range points {
if val, ok := point.GetField("value"); ok {
fields[point.Name()] = val
} else {
return nil, errors.New("only field expected is 'value'")
}
}
p, err := ilp.New("data", nil, nil, fields, time.Unix(t, 0))
if err != nil {
return nil, err
}
for _, tag := range tags {
p.AddTag(tag.Key, tag.Value)
}
lines = append(lines, p)
}
for _, e := range ce.entries {
var err error
lines, err = e.read(lines, append(tags, lp.Tag{
Key: e.tagKey,
Value: e.tagValue,
}))
if err != nil {
return nil, err
}
}
return lines, nil
}
type Compactor struct {
wrapped Sink
n int
root compactorEntry
}
var _ Sink = (*Compactor)(nil)
func NewCompactor(name string, wrapped Sink) (Sink, error) {
c := &Compactor{
wrapped: wrapped,
root: compactorEntry{
points: make(map[int64][]ilp.CCMetric),
entries: make(map[string]*compactorEntry),
},
}
return c, nil
}
func (c *Compactor) Write(point ilp.CCMetric) error {
taglist := make([]lp.Tag, 0)
for k, v := range point.Tags() {
taglist = append(taglist, lp.Tag{Key: k, Value: v})
}
sort.Slice(taglist, func(i, j int) bool {
a, b := taglist[i], taglist[j]
return a.Key < b.Key
})
e := &c.root
for _, tag := range taglist {
mapkey := tag.Key + ":" + tag.Value
ce, ok := e.entries[mapkey]
if !ok {
ce = &compactorEntry{
tagKey: tag.Key,
tagValue: tag.Value,
points: make(map[int64][]ilp.CCMetric),
entries: make(map[string]*compactorEntry),
}
e.entries[mapkey] = ce
}
e = ce
}
t := point.Time().Unix()
c.n += 1
e.points[t] = append(e.points[t], point)
return nil
}
func (c *Compactor) Flush() error {
points := make([]ilp.CCMetric, 0, c.n)
points, err := c.root.read(points, make([]lp.Tag, 0, 5))
if err != nil {
return err
}
for _, p := range points {
if err := c.wrapped.Write(p); err != nil {
return err
}
}
c.n = 0
c.root.points = make(map[int64][]ilp.CCMetric)
c.root.entries = make(map[string]*compactorEntry)
return c.wrapped.Flush()
}
func (c *Compactor) Close() {
c.wrapped.Close()
}
func (c *Compactor) Name() string {
return fmt.Sprintf("%s (compacted)", c.wrapped.Name())
}

View File

@ -21,6 +21,7 @@ type HttpSinkConfig struct {
Timeout string `json:"timeout,omitempty"`
IdleConnTimeout string `json:"idle_connection_timeout,omitempty"`
FlushDelay string `json:"flush_delay,omitempty"`
MaxRetries int `json:"max_retries,omitempty"`
}
type HttpSink struct {
@ -37,76 +38,75 @@ type HttpSink struct {
}
func (s *HttpSink) Write(m lp.CCMetric) error {
if s.buffer.Len() == 0 && s.flushDelay != 0 {
// This is the first write since the last flush, start the flushTimer!
if s.flushTimer != nil && s.flushTimer.Stop() {
cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?")
}
// Run a batched flush for all lines that have arrived in the last second
s.flushTimer = time.AfterFunc(s.flushDelay, func() {
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "flush failed:", err.Error())
}
})
}
p := m.ToPoint(s.meta_as_tags)
s.lock.Lock()
firstWriteOfBatch := s.buffer.Len() == 0
_, err := s.encoder.Encode(p)
s.lock.Unlock() // defer does not work here as Flush() takes the lock as well
s.lock.Unlock()
if err != nil {
cclog.ComponentError(s.name, "encoding failed:", err.Error())
return err
}
// Flush synchronously if "flush_delay" is zero
if s.flushDelay == 0 {
return s.Flush()
}
return err
if firstWriteOfBatch {
if s.flushTimer == nil {
s.flushTimer = time.AfterFunc(s.flushDelay, func() {
if err := s.Flush(); err != nil {
cclog.ComponentError(s.name, "flush failed:", err.Error())
}
})
} else {
s.flushTimer.Reset(s.flushDelay)
}
}
return nil
}
func (s *HttpSink) Flush() error {
// buffer is read by client.Do, prevent concurrent modifications
// Own lock for as short as possible: the time it takes to copy the buffer.
s.lock.Lock()
defer s.lock.Unlock()
// Do not flush empty buffer
if s.buffer.Len() == 0 {
buf := make([]byte, s.buffer.Len())
copy(buf, s.buffer.Bytes())
s.buffer.Reset()
s.lock.Unlock()
if len(buf) == 0 {
return nil
}
// Create new request to send buffer
req, err := http.NewRequest(http.MethodPost, s.config.URL, s.buffer)
if err != nil {
cclog.ComponentError(s.name, "failed to create request:", err.Error())
return err
}
var res *http.Response
for i := 0; i < s.config.MaxRetries; i++ {
// Create new request to send buffer
req, err := http.NewRequest(http.MethodPost, s.config.URL, bytes.NewReader(buf))
if err != nil {
cclog.ComponentError(s.name, "failed to create request:", err.Error())
return err
}
// Set authorization header
if len(s.config.JWT) != 0 {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.config.JWT))
}
// Set authorization header
if len(s.config.JWT) != 0 {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.config.JWT))
}
// Send
res, err := s.client.Do(req)
// Do request
res, err = s.client.Do(req)
if err != nil {
cclog.ComponentError(s.name, "transport/tcp error:", err.Error())
// Wait between retries
time.Sleep(time.Duration(i+1) * time.Second)
continue
}
// Clear buffer
s.buffer.Reset()
// Handle transport/tcp errors
if err != nil {
cclog.ComponentError(s.name, "transport/tcp error:", err.Error())
return err
break
}
// Handle application errors
if res.StatusCode != http.StatusOK {
err = errors.New(res.Status)
err := errors.New(res.Status)
cclog.ComponentError(s.name, "application error:", err.Error())
return err
}
@ -129,6 +129,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
s.config.IdleConnTimeout = "120s" // should be larger than the measurement interval.
s.config.Timeout = "5s"
s.config.FlushDelay = "5s"
s.config.MaxRetries = 3
// Read config
if len(config) > 0 {

View File

@ -21,7 +21,7 @@ type NatsSinkConfig struct {
Subject string `json:"subject,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
FlushDelay string `json:"flush-delay,omitempty"`
FlushDelay string `json:"flush_delay,omitempty"`
}
type NatsSink struct {
@ -87,6 +87,10 @@ func (s *NatsSink) Flush() error {
s.buffer.Reset()
s.lock.Unlock()
if len(buf) == 0 {
return nil
}
if err := s.client.Publish(s.config.Subject, buf); err != nil {
cclog.ComponentError(s.name, "Flush:", err.Error())
return err