diff --git a/sinks/httpSink.go b/sinks/httpSink.go index 3cdae62..d81434e 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -16,7 +16,7 @@ import ( type HttpSinkConfig struct { defaultSinkConfig - URL string `json:"url,omitempty"` + URL string `json:"url"` JWT string `json:"jwt,omitempty"` Timeout string `json:"timeout,omitempty"` IdleConnTimeout string `json:"idle_connection_timeout,omitempty"` @@ -26,10 +26,10 @@ type HttpSinkConfig struct { type HttpSink struct { sink - client *http.Client - encoder *influx.Encoder - lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer - buffer *bytes.Buffer + client *http.Client + encoder influx.Encoder + lock sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer + //buffer *bytes.Buffer flushTimer *time.Timer config HttpSinkConfig idleConnTimeout time.Duration @@ -38,20 +38,29 @@ type HttpSink struct { } func (s *HttpSink) Write(m lp.CCMetric) error { + var err error = nil + var firstWriteOfBatch bool = false p := m.ToPoint(s.meta_as_tags) s.lock.Lock() - firstWriteOfBatch := s.buffer.Len() == 0 - s.encoder.StartLine(p.Name()) - for _, v := range p.TagList() { - s.encoder.AddTag(v.Key, v.Value) + firstWriteOfBatch = len(s.encoder.Bytes()) == 0 + v, ok := m.GetField("value") + if ok { + + s.encoder.StartLine(p.Name()) + for _, v := range p.TagList() { + s.encoder.AddTag(v.Key, v.Value) + } + + s.encoder.AddField("value", influx.MustNewValue(v)) + s.encoder.EndLine(p.Time()) + err = s.encoder.Err() + if err != nil { + cclog.ComponentError(s.name, "encoding failed:", err.Error()) + s.lock.Unlock() + return err + } } - s.encoder.EndLine(p.Time()) - err := s.encoder.Err() s.lock.Unlock() - if err != nil { - cclog.ComponentError(s.name, "encoding failed:", err.Error()) - return err - } if s.flushDelay == 0 { return s.Flush() @@ -75,9 +84,9 @@ func (s *HttpSink) Write(m lp.CCMetric) error { func (s *HttpSink) Flush() error { // Own lock for as short as possible: the time it takes to copy the buffer. s.lock.Lock() - buf := make([]byte, s.buffer.Len()) - copy(buf, s.buffer.Bytes()) - s.buffer.Reset() + buf := make([]byte, len(s.encoder.Bytes())) + copy(buf, s.encoder.Bytes()) + s.encoder.Reset() s.lock.Unlock() if len(buf) == 0 { return nil @@ -139,6 +148,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { s.config.Timeout = "5s" s.config.FlushDelay = "5s" s.config.MaxRetries = 3 + cclog.ComponentDebug(s.name, "init") // Read config if len(config) > 0 { @@ -153,6 +163,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { if len(s.config.IdleConnTimeout) > 0 { t, err := time.ParseDuration(s.config.IdleConnTimeout) if err == nil { + cclog.ComponentDebug(s.name, "idleConnTimeout", t) s.idleConnTimeout = t } } @@ -160,12 +171,14 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { t, err := time.ParseDuration(s.config.Timeout) if err == nil { s.timeout = t + cclog.ComponentDebug(s.name, "timeout", t) } } if len(s.config.FlushDelay) > 0 { t, err := time.ParseDuration(s.config.FlushDelay) if err == nil { s.flushDelay = t + cclog.ComponentDebug(s.name, "flushDelay", t) } } // Create lookup map to use meta infos as tags in the output metric @@ -178,8 +191,6 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { IdleConnTimeout: s.idleConnTimeout, } s.client = &http.Client{Transport: tr, Timeout: s.timeout} - s.buffer = &bytes.Buffer{} s.encoder.SetPrecision(influx.Second) - s.encoder.SetBuffer(s.buffer.Bytes()) return s, nil }