mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-10-31 00:55:06 +01:00 
			
		
		
		
	Use not a pointer to line-protocol.Encoder
This commit is contained in:
		| @@ -16,7 +16,7 @@ import ( | |||||||
|  |  | ||||||
| type HttpSinkConfig struct { | type HttpSinkConfig struct { | ||||||
| 	defaultSinkConfig | 	defaultSinkConfig | ||||||
| 	URL             string `json:"url,omitempty"` | 	URL             string `json:"url"` | ||||||
| 	JWT             string `json:"jwt,omitempty"` | 	JWT             string `json:"jwt,omitempty"` | ||||||
| 	Timeout         string `json:"timeout,omitempty"` | 	Timeout         string `json:"timeout,omitempty"` | ||||||
| 	IdleConnTimeout string `json:"idle_connection_timeout,omitempty"` | 	IdleConnTimeout string `json:"idle_connection_timeout,omitempty"` | ||||||
| @@ -26,10 +26,10 @@ type HttpSinkConfig struct { | |||||||
|  |  | ||||||
| type HttpSink struct { | type HttpSink struct { | ||||||
| 	sink | 	sink | ||||||
| 	client          *http.Client | 	client  *http.Client | ||||||
| 	encoder         *influx.Encoder | 	encoder influx.Encoder | ||||||
| 	lock            sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer | 	lock    sync.Mutex // Flush() runs in another goroutine, so this lock has to protect the buffer | ||||||
| 	buffer          *bytes.Buffer | 	//buffer          *bytes.Buffer | ||||||
| 	flushTimer      *time.Timer | 	flushTimer      *time.Timer | ||||||
| 	config          HttpSinkConfig | 	config          HttpSinkConfig | ||||||
| 	idleConnTimeout time.Duration | 	idleConnTimeout time.Duration | ||||||
| @@ -38,20 +38,29 @@ type HttpSink struct { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (s *HttpSink) Write(m lp.CCMetric) error { | func (s *HttpSink) Write(m lp.CCMetric) error { | ||||||
|  | 	var err error = nil | ||||||
|  | 	var firstWriteOfBatch bool = false | ||||||
| 	p := m.ToPoint(s.meta_as_tags) | 	p := m.ToPoint(s.meta_as_tags) | ||||||
| 	s.lock.Lock() | 	s.lock.Lock() | ||||||
| 	firstWriteOfBatch := s.buffer.Len() == 0 | 	firstWriteOfBatch = len(s.encoder.Bytes()) == 0 | ||||||
| 	s.encoder.StartLine(p.Name()) | 	v, ok := m.GetField("value") | ||||||
| 	for _, v := range p.TagList() { | 	if ok { | ||||||
| 		s.encoder.AddTag(v.Key, v.Value) |  | ||||||
|  | 		s.encoder.StartLine(p.Name()) | ||||||
|  | 		for _, v := range p.TagList() { | ||||||
|  | 			s.encoder.AddTag(v.Key, v.Value) | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		s.encoder.AddField("value", influx.MustNewValue(v)) | ||||||
|  | 		s.encoder.EndLine(p.Time()) | ||||||
|  | 		err = s.encoder.Err() | ||||||
|  | 		if err != nil { | ||||||
|  | 			cclog.ComponentError(s.name, "encoding failed:", err.Error()) | ||||||
|  | 			s.lock.Unlock() | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 	s.encoder.EndLine(p.Time()) |  | ||||||
| 	err := s.encoder.Err() |  | ||||||
| 	s.lock.Unlock() | 	s.lock.Unlock() | ||||||
| 	if err != nil { |  | ||||||
| 		cclog.ComponentError(s.name, "encoding failed:", err.Error()) |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if s.flushDelay == 0 { | 	if s.flushDelay == 0 { | ||||||
| 		return s.Flush() | 		return s.Flush() | ||||||
| @@ -75,9 +84,9 @@ func (s *HttpSink) Write(m lp.CCMetric) error { | |||||||
| func (s *HttpSink) Flush() error { | func (s *HttpSink) Flush() error { | ||||||
| 	// Own lock for as short as possible: the time it takes to copy the buffer. | 	// Own lock for as short as possible: the time it takes to copy the buffer. | ||||||
| 	s.lock.Lock() | 	s.lock.Lock() | ||||||
| 	buf := make([]byte, s.buffer.Len()) | 	buf := make([]byte, len(s.encoder.Bytes())) | ||||||
| 	copy(buf, s.buffer.Bytes()) | 	copy(buf, s.encoder.Bytes()) | ||||||
| 	s.buffer.Reset() | 	s.encoder.Reset() | ||||||
| 	s.lock.Unlock() | 	s.lock.Unlock() | ||||||
| 	if len(buf) == 0 { | 	if len(buf) == 0 { | ||||||
| 		return nil | 		return nil | ||||||
| @@ -139,6 +148,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { | |||||||
| 	s.config.Timeout = "5s" | 	s.config.Timeout = "5s" | ||||||
| 	s.config.FlushDelay = "5s" | 	s.config.FlushDelay = "5s" | ||||||
| 	s.config.MaxRetries = 3 | 	s.config.MaxRetries = 3 | ||||||
|  | 	cclog.ComponentDebug(s.name, "init") | ||||||
|  |  | ||||||
| 	// Read config | 	// Read config | ||||||
| 	if len(config) > 0 { | 	if len(config) > 0 { | ||||||
| @@ -153,6 +163,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { | |||||||
| 	if len(s.config.IdleConnTimeout) > 0 { | 	if len(s.config.IdleConnTimeout) > 0 { | ||||||
| 		t, err := time.ParseDuration(s.config.IdleConnTimeout) | 		t, err := time.ParseDuration(s.config.IdleConnTimeout) | ||||||
| 		if err == nil { | 		if err == nil { | ||||||
|  | 			cclog.ComponentDebug(s.name, "idleConnTimeout", t) | ||||||
| 			s.idleConnTimeout = t | 			s.idleConnTimeout = t | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| @@ -160,12 +171,14 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { | |||||||
| 		t, err := time.ParseDuration(s.config.Timeout) | 		t, err := time.ParseDuration(s.config.Timeout) | ||||||
| 		if err == nil { | 		if err == nil { | ||||||
| 			s.timeout = t | 			s.timeout = t | ||||||
|  | 			cclog.ComponentDebug(s.name, "timeout", t) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	if len(s.config.FlushDelay) > 0 { | 	if len(s.config.FlushDelay) > 0 { | ||||||
| 		t, err := time.ParseDuration(s.config.FlushDelay) | 		t, err := time.ParseDuration(s.config.FlushDelay) | ||||||
| 		if err == nil { | 		if err == nil { | ||||||
| 			s.flushDelay = t | 			s.flushDelay = t | ||||||
|  | 			cclog.ComponentDebug(s.name, "flushDelay", t) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	// Create lookup map to use meta infos as tags in the output metric | 	// Create lookup map to use meta infos as tags in the output metric | ||||||
| @@ -178,8 +191,6 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { | |||||||
| 		IdleConnTimeout: s.idleConnTimeout, | 		IdleConnTimeout: s.idleConnTimeout, | ||||||
| 	} | 	} | ||||||
| 	s.client = &http.Client{Transport: tr, Timeout: s.timeout} | 	s.client = &http.Client{Transport: tr, Timeout: s.timeout} | ||||||
| 	s.buffer = &bytes.Buffer{} |  | ||||||
| 	s.encoder.SetPrecision(influx.Second) | 	s.encoder.SetPrecision(influx.Second) | ||||||
| 	s.encoder.SetBuffer(s.buffer.Bytes()) |  | ||||||
| 	return s, nil | 	return s, nil | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user