diff --git a/sinks/compactor.go b/sinks/compactor.go new file mode 100644 index 0000000..90dd2bb --- /dev/null +++ b/sinks/compactor.go @@ -0,0 +1,134 @@ +package sinks + +import ( + "errors" + "fmt" + "sort" + "time" + + ilp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric" + lp "github.com/influxdata/line-protocol" +) + +// A node in a tree structured by tags +type compactorEntry struct { + tagKey string // The tag key all points share in this sub-tree. + tagValue string // The tag value all points share in this sub-tree. + points map[int64][]ilp.CCMetric // The points with those keys (only), grouped by points where the time is the same. + entries map[string]*compactorEntry // Points with even more keys go here, grouped by those keys. +} + +func (ce *compactorEntry) read(lines []ilp.CCMetric, tags []lp.Tag) ([]ilp.CCMetric, error) { + for t, points := range ce.points { + fields := make(map[string]interface{}) + for _, point := range points { + if val, ok := point.GetField("value"); ok { + fields[point.Name()] = val + } else { + return nil, errors.New("only field expected is 'value'") + } + } + + p, err := ilp.New("data", nil, nil, fields, time.Unix(t, 0)) + if err != nil { + return nil, err + } + for _, tag := range tags { + p.AddTag(tag.Key, tag.Value) + } + + lines = append(lines, p) + } + + for _, e := range ce.entries { + var err error + lines, err = e.read(lines, append(tags, lp.Tag{ + Key: e.tagKey, + Value: e.tagValue, + })) + if err != nil { + return nil, err + } + } + + return lines, nil +} + +type Compactor struct { + wrapped Sink + n int + root compactorEntry +} + +var _ Sink = (*Compactor)(nil) + +func NewCompactor(name string, wrapped Sink) (Sink, error) { + c := &Compactor{ + wrapped: wrapped, + root: compactorEntry{ + points: make(map[int64][]ilp.CCMetric), + entries: make(map[string]*compactorEntry), + }, + } + return c, nil +} + +func (c *Compactor) Write(point ilp.CCMetric) error { + taglist := make([]lp.Tag, 0) + for k, v := range point.Tags() { + taglist = append(taglist, lp.Tag{Key: k, Value: v}) + } + + sort.Slice(taglist, func(i, j int) bool { + a, b := taglist[i], taglist[j] + return a.Key < b.Key + }) + + e := &c.root + for _, tag := range taglist { + mapkey := tag.Key + ":" + tag.Value + ce, ok := e.entries[mapkey] + if !ok { + ce = &compactorEntry{ + tagKey: tag.Key, + tagValue: tag.Value, + points: make(map[int64][]ilp.CCMetric), + entries: make(map[string]*compactorEntry), + } + e.entries[mapkey] = ce + } + e = ce + } + + t := point.Time().Unix() + c.n += 1 + e.points[t] = append(e.points[t], point) + return nil +} + +func (c *Compactor) Flush() error { + points := make([]ilp.CCMetric, 0, c.n) + points, err := c.root.read(points, make([]lp.Tag, 0, 5)) + if err != nil { + return err + } + + for _, p := range points { + if err := c.wrapped.Write(p); err != nil { + return err + } + } + + c.n = 0 + c.root.points = make(map[int64][]ilp.CCMetric) + c.root.entries = make(map[string]*compactorEntry) + return c.wrapped.Flush() +} + +func (c *Compactor) Close() { + c.wrapped.Close() +} + +func (c *Compactor) Name() string { + return fmt.Sprintf("%s (compacted)", c.wrapped.Name()) +} diff --git a/sinks/httpSink.go b/sinks/httpSink.go index 962c2d7..c0a99ce 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -21,6 +21,7 @@ type HttpSinkConfig struct { Timeout string `json:"timeout,omitempty"` IdleConnTimeout string `json:"idle_connection_timeout,omitempty"` FlushDelay string `json:"flush_delay,omitempty"` + MaxRetries int `json:"max_retries,omitempty"` } type HttpSink struct { @@ -37,76 +38,75 @@ type HttpSink struct { } func (s *HttpSink) Write(m lp.CCMetric) error { - if s.buffer.Len() == 0 && s.flushDelay != 0 { - // This is the first write since the last flush, start the flushTimer! - if s.flushTimer != nil && s.flushTimer.Stop() { - cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?") - } - - // Run a batched flush for all lines that have arrived in the last second - s.flushTimer = time.AfterFunc(s.flushDelay, func() { - if err := s.Flush(); err != nil { - cclog.ComponentError(s.name, "flush failed:", err.Error()) - } - }) - } - p := m.ToPoint(s.meta_as_tags) - s.lock.Lock() + firstWriteOfBatch := s.buffer.Len() == 0 _, err := s.encoder.Encode(p) - s.lock.Unlock() // defer does not work here as Flush() takes the lock as well - + s.lock.Unlock() if err != nil { cclog.ComponentError(s.name, "encoding failed:", err.Error()) return err } - // Flush synchronously if "flush_delay" is zero if s.flushDelay == 0 { return s.Flush() } - return err + if firstWriteOfBatch { + if s.flushTimer == nil { + s.flushTimer = time.AfterFunc(s.flushDelay, func() { + if err := s.Flush(); err != nil { + cclog.ComponentError(s.name, "flush failed:", err.Error()) + } + }) + } else { + s.flushTimer.Reset(s.flushDelay) + } + } + + return nil } func (s *HttpSink) Flush() error { - // buffer is read by client.Do, prevent concurrent modifications + // Own lock for as short as possible: the time it takes to copy the buffer. s.lock.Lock() - defer s.lock.Unlock() - - // Do not flush empty buffer - if s.buffer.Len() == 0 { + buf := make([]byte, s.buffer.Len()) + copy(buf, s.buffer.Bytes()) + s.buffer.Reset() + s.lock.Unlock() + if len(buf) == 0 { return nil } - // Create new request to send buffer - req, err := http.NewRequest(http.MethodPost, s.config.URL, s.buffer) - if err != nil { - cclog.ComponentError(s.name, "failed to create request:", err.Error()) - return err - } + var res *http.Response + for i := 0; i < s.config.MaxRetries; i++ { + // Create new request to send buffer + req, err := http.NewRequest(http.MethodPost, s.config.URL, bytes.NewReader(buf)) + if err != nil { + cclog.ComponentError(s.name, "failed to create request:", err.Error()) + return err + } - // Set authorization header - if len(s.config.JWT) != 0 { - req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.config.JWT)) - } + // Set authorization header + if len(s.config.JWT) != 0 { + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.config.JWT)) + } - // Send - res, err := s.client.Do(req) + // Do request + res, err = s.client.Do(req) + if err != nil { + cclog.ComponentError(s.name, "transport/tcp error:", err.Error()) + // Wait between retries + time.Sleep(time.Duration(i+1) * time.Second) + continue + } - // Clear buffer - s.buffer.Reset() - - // Handle transport/tcp errors - if err != nil { - cclog.ComponentError(s.name, "transport/tcp error:", err.Error()) - return err + break } // Handle application errors if res.StatusCode != http.StatusOK { - err = errors.New(res.Status) + err := errors.New(res.Status) cclog.ComponentError(s.name, "application error:", err.Error()) return err } @@ -129,6 +129,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { s.config.IdleConnTimeout = "120s" // should be larger than the measurement interval. s.config.Timeout = "5s" s.config.FlushDelay = "5s" + s.config.MaxRetries = 3 // Read config if len(config) > 0 { diff --git a/sinks/natsSink.go b/sinks/natsSink.go index 3e2e728..4d43454 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -21,7 +21,7 @@ type NatsSinkConfig struct { Subject string `json:"subject,omitempty"` User string `json:"user,omitempty"` Password string `json:"password,omitempty"` - FlushDelay string `json:"flush-delay,omitempty"` + FlushDelay string `json:"flush_delay,omitempty"` } type NatsSink struct { @@ -87,6 +87,10 @@ func (s *NatsSink) Flush() error { s.buffer.Reset() s.lock.Unlock() + if len(buf) == 0 { + return nil + } + if err := s.client.Publish(s.config.Subject, buf); err != nil { cclog.ComponentError(s.name, "Flush:", err.Error()) return err