From b732b2d7399dbebaca85cff218c328d8835f325b Mon Sep 17 00:00:00 2001 From: Lou Date: Wed, 8 Jun 2022 14:12:35 +0200 Subject: [PATCH] Improved http sink (#78) * automatic flush in NatsSink * tweak default options of HttpSink * shorter cirt. section and retries for HttpSink * fix error handling * Remove file added by mistake. * Use http instead of ftp to download likwid * Fix serial number in rocmCollector Co-authored-by: Thomas Roehl --- sinks/httpSink.go | 107 +++++++++++++++++++++++----------------------- sinks/natsSink.go | 76 ++++++++++++++++++++++---------- 2 files changed, 107 insertions(+), 76 deletions(-) diff --git a/sinks/httpSink.go b/sinks/httpSink.go index 7713638..466915d 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -19,9 +19,9 @@ type HttpSinkConfig struct { URL string `json:"url,omitempty"` JWT string `json:"jwt,omitempty"` Timeout string `json:"timeout,omitempty"` - MaxIdleConns int `json:"max_idle_connections,omitempty"` IdleConnTimeout string `json:"idle_connection_timeout,omitempty"` FlushDelay string `json:"flush_delay,omitempty"` + MaxRetries int `json:"max_retries,omitempty"` } type HttpSink struct { @@ -32,83 +32,85 @@ type HttpSink struct { buffer *bytes.Buffer flushTimer *time.Timer config HttpSinkConfig - maxIdleConns int idleConnTimeout time.Duration timeout time.Duration flushDelay time.Duration } func (s *HttpSink) Write(m lp.CCMetric) error { - if s.buffer.Len() == 0 && s.flushDelay != 0 { - // This is the first write since the last flush, start the flushTimer! - if s.flushTimer != nil && s.flushTimer.Stop() { - cclog.ComponentDebug(s.name, "unexpected: the flushTimer was already running?") - } - - // Run a batched flush for all lines that have arrived in the last second - s.flushTimer = time.AfterFunc(s.flushDelay, func() { - if err := s.Flush(); err != nil { - cclog.ComponentError(s.name, "flush failed:", err.Error()) - } - }) - } - p := m.ToPoint(s.meta_as_tags) - s.lock.Lock() + firstWriteOfBatch := s.buffer.Len() == 0 _, err := s.encoder.Encode(p) - s.lock.Unlock() // defer does not work here as Flush() takes the lock as well - + s.lock.Unlock() if err != nil { cclog.ComponentError(s.name, "encoding failed:", err.Error()) return err } - // Flush synchronously if "flush_delay" is zero if s.flushDelay == 0 { return s.Flush() } - return err + if firstWriteOfBatch { + if s.flushTimer == nil { + s.flushTimer = time.AfterFunc(s.flushDelay, func() { + if err := s.Flush(); err != nil { + cclog.ComponentError(s.name, "flush failed:", err.Error()) + } + }) + } else { + s.flushTimer.Reset(s.flushDelay) + } + } + + return nil } func (s *HttpSink) Flush() error { - // buffer is read by client.Do, prevent concurrent modifications + // Own lock for as short as possible: the time it takes to copy the buffer. s.lock.Lock() - defer s.lock.Unlock() - - // Do not flush empty buffer - if s.buffer.Len() == 0 { + buf := make([]byte, s.buffer.Len()) + copy(buf, s.buffer.Bytes()) + s.buffer.Reset() + s.lock.Unlock() + if len(buf) == 0 { return nil } - // Create new request to send buffer - req, err := http.NewRequest(http.MethodPost, s.config.URL, s.buffer) - if err != nil { - cclog.ComponentError(s.name, "failed to create request:", err.Error()) - return err + var res *http.Response + for i := 0; i < s.config.MaxRetries; i++ { + // Create new request to send buffer + req, err := http.NewRequest(http.MethodPost, s.config.URL, bytes.NewReader(buf)) + if err != nil { + cclog.ComponentError(s.name, "failed to create request:", err.Error()) + return err + } + + // Set authorization header + if len(s.config.JWT) != 0 { + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.config.JWT)) + } + + // Do request + res, err = s.client.Do(req) + if err != nil { + cclog.ComponentError(s.name, "transport/tcp error:", err.Error()) + // Wait between retries + time.Sleep(time.Duration(i+1) * (time.Second / 2)) + continue + } + + break } - // Set authorization header - if len(s.config.JWT) != 0 { - req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.config.JWT)) - } - - // Send - res, err := s.client.Do(req) - - // Clear buffer - s.buffer.Reset() - - // Handle transport/tcp errors - if err != nil { - cclog.ComponentError(s.name, "transport/tcp error:", err.Error()) - return err + if res == nil { + return errors.New("flush failed due to repeated errors") } // Handle application errors if res.StatusCode != http.StatusOK { - err = errors.New(res.Status) + err := errors.New(res.Status) cclog.ComponentError(s.name, "application error:", err.Error()) return err } @@ -128,10 +130,10 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { s := new(HttpSink) // Set default values s.name = fmt.Sprintf("HttpSink(%s)", name) - s.config.MaxIdleConns = 10 - s.config.IdleConnTimeout = "5s" + s.config.IdleConnTimeout = "120s" // should be larger than the measurement interval. s.config.Timeout = "5s" - s.config.FlushDelay = "1s" + s.config.FlushDelay = "5s" + s.config.MaxRetries = 3 // Read config if len(config) > 0 { @@ -143,9 +145,6 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { if len(s.config.URL) == 0 { return nil, errors.New("`url` config option is required for HTTP sink") } - if s.config.MaxIdleConns > 0 { - s.maxIdleConns = s.config.MaxIdleConns - } if len(s.config.IdleConnTimeout) > 0 { t, err := time.ParseDuration(s.config.IdleConnTimeout) if err == nil { @@ -170,7 +169,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { s.meta_as_tags[k] = true } tr := &http.Transport{ - MaxIdleConns: s.maxIdleConns, + MaxIdleConns: 1, // We will only ever talk to one host. IdleConnTimeout: s.idleConnTimeout, } s.client = &http.Client{Transport: tr, Timeout: s.timeout} diff --git a/sinks/natsSink.go b/sinks/natsSink.go index 0597e9b..4d43454 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "sync" "time" cclog "github.com/ClusterCockpit/cc-metric-collector/internal/ccLogger" @@ -15,11 +16,12 @@ import ( type NatsSinkConfig struct { defaultSinkConfig - Host string `json:"host,omitempty"` - Port string `json:"port,omitempty"` - Database string `json:"database,omitempty"` - User string `json:"user,omitempty"` - Password string `json:"password,omitempty"` + Host string `json:"host,omitempty"` + Port string `json:"port,omitempty"` + Subject string `json:"subject,omitempty"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` + FlushDelay string `json:"flush_delay,omitempty"` } type NatsSink struct { @@ -28,6 +30,10 @@ type NatsSink struct { encoder *influx.Encoder buffer *bytes.Buffer config NatsSinkConfig + + lock sync.Mutex + flushDelay time.Duration + flushTimer *time.Timer } func (s *NatsSink) connect() error { @@ -54,37 +60,53 @@ func (s *NatsSink) connect() error { } func (s *NatsSink) Write(m lp.CCMetric) error { - if s.client != nil { - _, err := s.encoder.Encode(m.ToPoint(s.meta_as_tags)) - if err != nil { - cclog.ComponentError(s.name, "Write:", err.Error()) - return err - } + s.lock.Lock() + _, err := s.encoder.Encode(m.ToPoint(s.meta_as_tags)) + s.lock.Unlock() + if err != nil { + cclog.ComponentError(s.name, "Write:", err.Error()) + return err } + + if s.flushDelay == 0 { + s.Flush() + } else if s.flushTimer == nil { + s.flushTimer = time.AfterFunc(s.flushDelay, func() { + s.Flush() + }) + } else { + s.flushTimer.Reset(s.flushDelay) + } + return nil } func (s *NatsSink) Flush() error { - if s.client != nil { - if err := s.client.Publish(s.config.Database, s.buffer.Bytes()); err != nil { - cclog.ComponentError(s.name, "Flush:", err.Error()) - return err - } - s.buffer.Reset() + s.lock.Lock() + buf := append([]byte{}, s.buffer.Bytes()...) // copy bytes + s.buffer.Reset() + s.lock.Unlock() + + if len(buf) == 0 { + return nil + } + + if err := s.client.Publish(s.config.Subject, buf); err != nil { + cclog.ComponentError(s.name, "Flush:", err.Error()) + return err } return nil } func (s *NatsSink) Close() { - if s.client != nil { - cclog.ComponentDebug(s.name, "Close") - s.client.Close() - } + cclog.ComponentDebug(s.name, "Close") + s.client.Close() } func NewNatsSink(name string, config json.RawMessage) (Sink, error) { s := new(NatsSink) s.name = fmt.Sprintf("NatsSink(%s)", name) + s.flushDelay = 10 * time.Second if len(config) > 0 { err := json.Unmarshal(config, &s.config) if err != nil { @@ -94,7 +116,7 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) { } if len(s.config.Host) == 0 || len(s.config.Port) == 0 || - len(s.config.Database) == 0 { + len(s.config.Subject) == 0 { return nil, errors.New("not all configuration variables set required by NatsSink") } // Create lookup map to use meta infos as tags in the output metric @@ -112,5 +134,15 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) { if err := s.connect(); err != nil { return nil, fmt.Errorf("unable to connect: %v", err) } + + s.flushTimer = nil + if len(s.config.FlushDelay) != 0 { + var err error + s.flushDelay, err = time.ParseDuration(s.config.FlushDelay) + if err != nil { + return nil, err + } + } + return s, nil }