From 74ebb5f48ff231978789d56789f06f5b6fa8f42e Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Thu, 19 Dec 2024 21:16:39 +0100 Subject: [PATCH] Add precision option to all sinks using Influx's encoder --- sinks/httpSink.go | 4 +- sinks/httpSink.md | 18 +++++---- sinks/influxAsyncSink.go | 20 +++++++++- sinks/influxAsyncSink.md | 14 ++++++- sinks/influxSink.go | 18 ++++++++- sinks/influxSink.md | 16 ++++++-- sinks/natsSink.go | 83 +++++++++++++++++++++++++++++++++++----- sinks/natsSink.md | 16 +++++++- 8 files changed, 160 insertions(+), 29 deletions(-) diff --git a/sinks/httpSink.go b/sinks/httpSink.go index b846cdc..44d6dea 100644 --- a/sinks/httpSink.go +++ b/sinks/httpSink.go @@ -276,7 +276,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { s.config.Timeout = "5s" s.config.FlushDelay = "5s" s.config.MaxRetries = 3 - s.config.Precision = "ns" + s.config.Precision = "s" cclog.ComponentDebug(s.name, "Init()") // Read config @@ -339,7 +339,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) { s.mp.AddMoveMetaToTags("true", k, k) } - precision := influx.Nanosecond + precision := influx.Second if len(s.config.Precision) > 0 { switch s.config.Precision { case "s": diff --git a/sinks/httpSink.md b/sinks/httpSink.md index 7d77ddf..5c6aded 100644 --- a/sinks/httpSink.md +++ b/sinks/httpSink.md @@ -8,9 +8,6 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the { "": { "type": "http", - "meta_as_tags" : [ - "meta-key" - ], "url" : "https://my-monitoring.example.com:1234/api/write", "jwt" : "blabla.blabla.blabla", "username": "myUser", @@ -19,13 +16,16 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the "idle_connection_timeout" : "5s", "flush_delay": "2s", "batch_size": 1000, - "precision": "s" + "precision": "s", + "process_messages" : { + "see" : "docs of message processor for valid fields" + }, + "meta_as_tags" : [] } } ``` - `type`: makes the sink an `http` sink -- `meta_as_tags`: Move specific meta information to the tags in the output (optional) - `url`: The full URL of the endpoint - `jwt`: JSON web tokens for authentication (Using the *Bearer* scheme) - `username`: username for basic authentication @@ -35,8 +35,10 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the - `idle_connection_timeout`: Timeout for idle connections (default '120s'). Should be larger than the measurement interval to keep the connection open - `flush_delay`: Batch all writes arriving in during this duration (default '1s', batching can be disabled by setting it to 0) - `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay -- `precision`: Precision of the timestamp. Valid values are 's', 'ms', 'us' and 'ns'. (default is 'ns') +- `precision`: Precision of the timestamp. Valid values are 's', 'ms', 'us' and 'ns'. (default is 's') +- `process_messages`: Process messages with given rules before progressing or dropping, see [here](../pkg/messageProcessor/README.md) (optional) +- `meta_as_tags`: print all meta information as tags in the output (deprecated, optional) -### Using HttpSink for communication with cc-metric-store +### Using `http` sink for communication with cc-metric-store -The cc-metric-store only accepts metrics with a timestamp precision in seconds, so it is required to set `"precision": "s"`. \ No newline at end of file +The cc-metric-store only accepts metrics with a timestamp precision in seconds, so it is required to use `"precision": "s"`. \ No newline at end of file diff --git a/sinks/influxAsyncSink.go b/sinks/influxAsyncSink.go index 2f705c8..e496627 100644 --- a/sinks/influxAsyncSink.go +++ b/sinks/influxAsyncSink.go @@ -37,6 +37,8 @@ type InfluxAsyncSinkConfig struct { InfluxMaxRetryTime string `json:"max_retry_time,omitempty"` CustomFlushInterval string `json:"custom_flush_interval,omitempty"` MaxRetryAttempts uint `json:"max_retry_attempts,omitempty"` + // Timestamp precision + Precision string `json:"precision,omitempty"` } type InfluxAsyncSink struct { @@ -94,7 +96,22 @@ func (s *InfluxAsyncSink) connect() error { &tls.Config{ InsecureSkipVerify: true, }, - ).SetPrecision(time.Second) + ) + + precision := time.Second + if len(s.config.Precision) > 0 { + switch s.config.Precision { + case "s": + precision = time.Second + case "ms": + precision = time.Millisecond + case "us": + precision = time.Microsecond + case "ns": + precision = time.Nanosecond + } + } + clientOptions.SetPrecision(precision) s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions) s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database) @@ -160,6 +177,7 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) { s.config.CustomFlushInterval = "" s.customFlushInterval = time.Duration(0) s.config.MaxRetryAttempts = 1 + s.config.Precision = "s" // Default retry intervals (in seconds) // 1 2 diff --git a/sinks/influxAsyncSink.md b/sinks/influxAsyncSink.md index ddcf4b4..cedcf35 100644 --- a/sinks/influxAsyncSink.md +++ b/sinks/influxAsyncSink.md @@ -19,9 +19,13 @@ The `influxasync` sink uses the official [InfluxDB golang client](https://pkg.go "batch_size": 200, "retry_interval" : "1s", "retry_exponential_base" : 2, + "precision": "s", "max_retries": 20, "max_retry_time" : "168h", - "meta_as_tags" : [], + "process_messages" : { + "see" : "docs of message processor for valid fields" + }, + "meta_as_tags" : [] } } ``` @@ -39,6 +43,12 @@ The `influxasync` sink uses the official [InfluxDB golang client](https://pkg.go - `retry_exponential_base`: The retry interval is exponentially increased with this base, default 2 - `max_retries`: Maximal number of retry attempts - `max_retry_time`: Maximal time to retry failed writes, default 168h (one week) -- `meta_as_tags`: move meta information keys to tags (optional) +- `precision`: Precision of the timestamp. Valid values are 's', 'ms', 'us' and 'ns'. (default is 's') +- `process_messages`: Process messages with given rules before progressing or dropping, see [here](../pkg/messageProcessor/README.md) (optional) +- `meta_as_tags`: print all meta information as tags in the output (deprecated, optional) For information about the calculation of the retry interval settings, see [offical influxdb-client-go documentation](https://github.com/influxdata/influxdb-client-go#handling-of-failed-async-writes) + +### Using `influxasync` sink for communication with cc-metric-store + +The cc-metric-store only accepts metrics with a timestamp precision in seconds, so it is required to use `"precision": "s"`. \ No newline at end of file diff --git a/sinks/influxSink.go b/sinks/influxSink.go index 4fac48e..a3dba6a 100644 --- a/sinks/influxSink.go +++ b/sinks/influxSink.go @@ -59,6 +59,8 @@ type InfluxSink struct { InfluxMaxRetryTime string `json:"max_retry_time,omitempty"` // Specify whether to use GZip compression in write requests InfluxUseGzip bool `json:"use_gzip"` + // Timestamp precision + Precision string `json:"precision,omitempty"` } // influx line protocol encoder @@ -207,7 +209,20 @@ func (s *InfluxSink) connect() error { ) // Set time precision - clientOptions.SetPrecision(time.Nanosecond) + precision := time.Second + if len(s.config.Precision) > 0 { + switch s.config.Precision { + case "s": + precision = time.Second + case "ms": + precision = time.Millisecond + case "us": + precision = time.Microsecond + case "ns": + precision = time.Nanosecond + } + } + clientOptions.SetPrecision(precision) // Create new writeAPI s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions) @@ -421,6 +436,7 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) { // Set config default values s.config.BatchSize = 1000 s.config.FlushInterval = "1s" + s.config.Precision = "s" // Read config if len(config) > 0 { diff --git a/sinks/influxSink.md b/sinks/influxSink.md index 99390f5..acd0d06 100644 --- a/sinks/influxSink.md +++ b/sinks/influxSink.md @@ -17,14 +17,17 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de "ssl": true, "flush_delay" : "1s", "batch_size" : 1000, - "use_gzip": true - "meta_as_tags" : [], + "use_gzip": true, + "precision": "s", + "process_messages" : { + "see" : "docs of message processor for valid fields" + }, + "meta_as_tags" : [] } } ``` - `type`: makes the sink an `influxdb` sink -- `meta_as_tags`: print all meta information as tags in the output (optional) - `database`: All metrics are written to this bucket - `host`: Hostname of the InfluxDB database server - `port`: Port number (as string) of the InfluxDB database server @@ -34,6 +37,9 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de - `ssl`: Use SSL connection - `flush_delay`: Group metrics coming in to a single batch - `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay +- `precision`: Precision of the timestamp. Valid values are 's', 'ms', 'us' and 'ns'. (default is 's') +- `process_messages`: Process messages with given rules before progressing or dropping, see [here](../pkg/messageProcessor/README.md) (optional) +- `meta_as_tags`: print all meta information as tags in the output (deprecated, optional) Influx client options: ======= @@ -46,3 +52,7 @@ Influx client options: - `max_retries`: maximum count of retry attempts of failed writes - `max_retry_time`: maximum total retry timeout - `use_gzip`: Specify whether to use GZip compression in write requests + +### Using `influxdb` sink for communication with cc-metric-store + +The cc-metric-store only accepts metrics with a timestamp precision in seconds, so it is required to use `"precision": "s"`. \ No newline at end of file diff --git a/sinks/natsSink.go b/sinks/natsSink.go index 6d956fd..1982bfe 100644 --- a/sinks/natsSink.go +++ b/sinks/natsSink.go @@ -12,8 +12,9 @@ import ( lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor" - influx "github.com/influxdata/line-protocol" + influx "github.com/influxdata/line-protocol/v2/lineprotocol" nats "github.com/nats-io/nats.go" + "golang.org/x/exp/slices" ) type NatsSinkConfig struct { @@ -25,18 +26,23 @@ type NatsSinkConfig struct { Password string `json:"password,omitempty"` FlushDelay string `json:"flush_delay,omitempty"` NkeyFile string `json:"nkey_file,omitempty"` + // Timestamp precision + Precision string `json:"precision,omitempty"` } + type NatsSink struct { sink client *nats.Conn - encoder *influx.Encoder + encoder influx.Encoder buffer *bytes.Buffer config NatsSinkConfig lock sync.Mutex flushDelay time.Duration flushTimer *time.Timer + + extended_tag_list []key_value_pair } func (s *NatsSink) connect() error { @@ -73,7 +79,52 @@ func (s *NatsSink) Write(m lp.CCMessage) error { msg, err := s.mp.ProcessMessage(m) if err == nil && msg != nil { s.lock.Lock() - _, err := s.encoder.Encode(msg.ToPoint(nil)) + // Encode measurement name + s.encoder.StartLine(msg.Name()) + + // copy tags and meta data which should be used as tags + s.extended_tag_list = s.extended_tag_list[:0] + for key, value := range m.Tags() { + s.extended_tag_list = + append( + s.extended_tag_list, + key_value_pair{ + key: key, + value: value, + }, + ) + } + // Encode tags (they musts be in lexical order) + slices.SortFunc( + s.extended_tag_list, + func(a key_value_pair, b key_value_pair) int { + if a.key < b.key { + return -1 + } + if a.key > b.key { + return +1 + } + return 0 + }, + ) + for i := range s.extended_tag_list { + s.encoder.AddTag( + s.extended_tag_list[i].key, + s.extended_tag_list[i].value, + ) + } + + // Encode fields + for key, value := range msg.Fields() { + s.encoder.AddField(key, influx.MustNewValue(value)) + } + + // Encode time stamp + s.encoder.EndLine(msg.Time()) + + // Check for encoder errors + err := s.encoder.Err() + s.lock.Unlock() if err != nil { cclog.ComponentError(s.name, "Write:", err.Error()) @@ -96,8 +147,8 @@ func (s *NatsSink) Write(m lp.CCMessage) error { func (s *NatsSink) Flush() error { s.lock.Lock() - buf := append([]byte{}, s.buffer.Bytes()...) // copy bytes - s.buffer.Reset() + buf := slices.Clone(s.encoder.Bytes()) + s.encoder.Reset() s.lock.Unlock() if len(buf) == 0 { @@ -120,6 +171,7 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) { s.name = fmt.Sprintf("NatsSink(%s)", name) s.flushDelay = 10 * time.Second s.config.Port = "4222" + s.config.Precision = "s" if len(config) > 0 { d := json.NewDecoder(bytes.NewReader(config)) d.DisallowUnknownFields() @@ -148,16 +200,26 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) { for _, k := range s.config.MetaAsTags { s.mp.AddMoveMetaToTags("true", k, k) } + precision := influx.Second + if len(s.config.Precision) > 0 { + switch s.config.Precision { + case "s": + precision = influx.Second + case "ms": + precision = influx.Millisecond + case "us": + precision = influx.Microsecond + case "ns": + precision = influx.Nanosecond + } + } + // s.meta_as_tags = make(map[string]bool) // for _, k := range s.config.MetaAsTags { // s.meta_as_tags[k] = true // } // Setup Influx line protocol - s.buffer = &bytes.Buffer{} - s.buffer.Grow(1025) - s.encoder = influx.NewEncoder(s.buffer) - s.encoder.SetPrecision(time.Second) - s.encoder.SetMaxLineBytes(1024) + s.encoder.SetPrecision(precision) // Setup infos for connection if err := s.connect(); err != nil { return nil, fmt.Errorf("unable to connect: %v", err) @@ -171,6 +233,7 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) { return nil, err } } + s.extended_tag_list = make([]key_value_pair, 0) return s, nil } diff --git a/sinks/natsSink.md b/sinks/natsSink.md index ee32e80..8bafbb0 100644 --- a/sinks/natsSink.md +++ b/sinks/natsSink.md @@ -14,7 +14,12 @@ The `nats` sink publishes all metrics into a NATS network. The publishing key is "user": "exampleuser", "password" : "examplepw", "nkey_file": "/path/to/nkey_file", - "meta_as_tags" : [], + "flush_delay": "10s", + "precision": "s", + "process_messages" : { + "see" : "docs of message processor for valid fields" + }, + "meta_as_tags" : [] } } ``` @@ -25,5 +30,12 @@ The `nats` sink publishes all metrics into a NATS network. The publishing key is - `port`: Port number (as string) of the NATS server - `user`: Username for basic authentication - `password`: Password for basic authentication -- `meta_as_tags`: print all meta information as tags in the output (optional) - `nkey_file`: Path to credentials file with NKEY +- `flush_delay`: Maximum time until metrics are sent out +- `precision`: Precision of the timestamp. Valid values are 's', 'ms', 'us' and 'ns'. (default is 's') +- `process_messages`: Process messages with given rules before progressing or dropping, see [here](../pkg/messageProcessor/README.md) (optional) +- `meta_as_tags`: print all meta information as tags in the output (deprecated, optional) + +### Using `nats` sink for communication with cc-metric-store + +The cc-metric-store only accepts metrics with a timestamp precision in seconds, so it is required to use `"precision": "s"`. \ No newline at end of file