mirror of
https://github.com/ClusterCockpit/cc-metric-collector.git
synced 2025-04-06 21:45:55 +02:00
Add precision option to all sinks using Influx's encoder
This commit is contained in:
parent
a4f671a3cf
commit
74ebb5f48f
@ -276,7 +276,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
|
|||||||
s.config.Timeout = "5s"
|
s.config.Timeout = "5s"
|
||||||
s.config.FlushDelay = "5s"
|
s.config.FlushDelay = "5s"
|
||||||
s.config.MaxRetries = 3
|
s.config.MaxRetries = 3
|
||||||
s.config.Precision = "ns"
|
s.config.Precision = "s"
|
||||||
cclog.ComponentDebug(s.name, "Init()")
|
cclog.ComponentDebug(s.name, "Init()")
|
||||||
|
|
||||||
// Read config
|
// Read config
|
||||||
@ -339,7 +339,7 @@ func NewHttpSink(name string, config json.RawMessage) (Sink, error) {
|
|||||||
s.mp.AddMoveMetaToTags("true", k, k)
|
s.mp.AddMoveMetaToTags("true", k, k)
|
||||||
}
|
}
|
||||||
|
|
||||||
precision := influx.Nanosecond
|
precision := influx.Second
|
||||||
if len(s.config.Precision) > 0 {
|
if len(s.config.Precision) > 0 {
|
||||||
switch s.config.Precision {
|
switch s.config.Precision {
|
||||||
case "s":
|
case "s":
|
||||||
|
@ -8,9 +8,6 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the
|
|||||||
{
|
{
|
||||||
"<name>": {
|
"<name>": {
|
||||||
"type": "http",
|
"type": "http",
|
||||||
"meta_as_tags" : [
|
|
||||||
"meta-key"
|
|
||||||
],
|
|
||||||
"url" : "https://my-monitoring.example.com:1234/api/write",
|
"url" : "https://my-monitoring.example.com:1234/api/write",
|
||||||
"jwt" : "blabla.blabla.blabla",
|
"jwt" : "blabla.blabla.blabla",
|
||||||
"username": "myUser",
|
"username": "myUser",
|
||||||
@ -19,13 +16,16 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the
|
|||||||
"idle_connection_timeout" : "5s",
|
"idle_connection_timeout" : "5s",
|
||||||
"flush_delay": "2s",
|
"flush_delay": "2s",
|
||||||
"batch_size": 1000,
|
"batch_size": 1000,
|
||||||
"precision": "s"
|
"precision": "s",
|
||||||
|
"process_messages" : {
|
||||||
|
"see" : "docs of message processor for valid fields"
|
||||||
|
},
|
||||||
|
"meta_as_tags" : []
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
- `type`: makes the sink an `http` sink
|
- `type`: makes the sink an `http` sink
|
||||||
- `meta_as_tags`: Move specific meta information to the tags in the output (optional)
|
|
||||||
- `url`: The full URL of the endpoint
|
- `url`: The full URL of the endpoint
|
||||||
- `jwt`: JSON web tokens for authentication (Using the *Bearer* scheme)
|
- `jwt`: JSON web tokens for authentication (Using the *Bearer* scheme)
|
||||||
- `username`: username for basic authentication
|
- `username`: username for basic authentication
|
||||||
@ -35,8 +35,10 @@ The `http` sink uses POST requests to a HTTP server to submit the metrics in the
|
|||||||
- `idle_connection_timeout`: Timeout for idle connections (default '120s'). Should be larger than the measurement interval to keep the connection open
|
- `idle_connection_timeout`: Timeout for idle connections (default '120s'). Should be larger than the measurement interval to keep the connection open
|
||||||
- `flush_delay`: Batch all writes arriving in during this duration (default '1s', batching can be disabled by setting it to 0)
|
- `flush_delay`: Batch all writes arriving in during this duration (default '1s', batching can be disabled by setting it to 0)
|
||||||
- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay
|
- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay
|
||||||
- `precision`: Precision of the timestamp. Valid values are 's', 'ms', 'us' and 'ns'. (default is 'ns')
|
- `precision`: Precision of the timestamp. Valid values are 's', 'ms', 'us' and 'ns'. (default is 's')
|
||||||
|
- `process_messages`: Process messages with given rules before progressing or dropping, see [here](../pkg/messageProcessor/README.md) (optional)
|
||||||
|
- `meta_as_tags`: print all meta information as tags in the output (deprecated, optional)
|
||||||
|
|
||||||
### Using HttpSink for communication with cc-metric-store
|
### Using `http` sink for communication with cc-metric-store
|
||||||
|
|
||||||
The cc-metric-store only accepts metrics with a timestamp precision in seconds, so it is required to set `"precision": "s"`.
|
The cc-metric-store only accepts metrics with a timestamp precision in seconds, so it is required to use `"precision": "s"`.
|
@ -37,6 +37,8 @@ type InfluxAsyncSinkConfig struct {
|
|||||||
InfluxMaxRetryTime string `json:"max_retry_time,omitempty"`
|
InfluxMaxRetryTime string `json:"max_retry_time,omitempty"`
|
||||||
CustomFlushInterval string `json:"custom_flush_interval,omitempty"`
|
CustomFlushInterval string `json:"custom_flush_interval,omitempty"`
|
||||||
MaxRetryAttempts uint `json:"max_retry_attempts,omitempty"`
|
MaxRetryAttempts uint `json:"max_retry_attempts,omitempty"`
|
||||||
|
// Timestamp precision
|
||||||
|
Precision string `json:"precision,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type InfluxAsyncSink struct {
|
type InfluxAsyncSink struct {
|
||||||
@ -94,7 +96,22 @@ func (s *InfluxAsyncSink) connect() error {
|
|||||||
&tls.Config{
|
&tls.Config{
|
||||||
InsecureSkipVerify: true,
|
InsecureSkipVerify: true,
|
||||||
},
|
},
|
||||||
).SetPrecision(time.Second)
|
)
|
||||||
|
|
||||||
|
precision := time.Second
|
||||||
|
if len(s.config.Precision) > 0 {
|
||||||
|
switch s.config.Precision {
|
||||||
|
case "s":
|
||||||
|
precision = time.Second
|
||||||
|
case "ms":
|
||||||
|
precision = time.Millisecond
|
||||||
|
case "us":
|
||||||
|
precision = time.Microsecond
|
||||||
|
case "ns":
|
||||||
|
precision = time.Nanosecond
|
||||||
|
}
|
||||||
|
}
|
||||||
|
clientOptions.SetPrecision(precision)
|
||||||
|
|
||||||
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
|
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
|
||||||
s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database)
|
s.writeApi = s.client.WriteAPI(s.config.Organization, s.config.Database)
|
||||||
@ -160,6 +177,7 @@ func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error) {
|
|||||||
s.config.CustomFlushInterval = ""
|
s.config.CustomFlushInterval = ""
|
||||||
s.customFlushInterval = time.Duration(0)
|
s.customFlushInterval = time.Duration(0)
|
||||||
s.config.MaxRetryAttempts = 1
|
s.config.MaxRetryAttempts = 1
|
||||||
|
s.config.Precision = "s"
|
||||||
|
|
||||||
// Default retry intervals (in seconds)
|
// Default retry intervals (in seconds)
|
||||||
// 1 2
|
// 1 2
|
||||||
|
@ -19,9 +19,13 @@ The `influxasync` sink uses the official [InfluxDB golang client](https://pkg.go
|
|||||||
"batch_size": 200,
|
"batch_size": 200,
|
||||||
"retry_interval" : "1s",
|
"retry_interval" : "1s",
|
||||||
"retry_exponential_base" : 2,
|
"retry_exponential_base" : 2,
|
||||||
|
"precision": "s",
|
||||||
"max_retries": 20,
|
"max_retries": 20,
|
||||||
"max_retry_time" : "168h",
|
"max_retry_time" : "168h",
|
||||||
"meta_as_tags" : [],
|
"process_messages" : {
|
||||||
|
"see" : "docs of message processor for valid fields"
|
||||||
|
},
|
||||||
|
"meta_as_tags" : []
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
@ -39,6 +43,12 @@ The `influxasync` sink uses the official [InfluxDB golang client](https://pkg.go
|
|||||||
- `retry_exponential_base`: The retry interval is exponentially increased with this base, default 2
|
- `retry_exponential_base`: The retry interval is exponentially increased with this base, default 2
|
||||||
- `max_retries`: Maximal number of retry attempts
|
- `max_retries`: Maximal number of retry attempts
|
||||||
- `max_retry_time`: Maximal time to retry failed writes, default 168h (one week)
|
- `max_retry_time`: Maximal time to retry failed writes, default 168h (one week)
|
||||||
- `meta_as_tags`: move meta information keys to tags (optional)
|
- `precision`: Precision of the timestamp. Valid values are 's', 'ms', 'us' and 'ns'. (default is 's')
|
||||||
|
- `process_messages`: Process messages with given rules before progressing or dropping, see [here](../pkg/messageProcessor/README.md) (optional)
|
||||||
|
- `meta_as_tags`: print all meta information as tags in the output (deprecated, optional)
|
||||||
|
|
||||||
For information about the calculation of the retry interval settings, see [offical influxdb-client-go documentation](https://github.com/influxdata/influxdb-client-go#handling-of-failed-async-writes)
|
For information about the calculation of the retry interval settings, see [offical influxdb-client-go documentation](https://github.com/influxdata/influxdb-client-go#handling-of-failed-async-writes)
|
||||||
|
|
||||||
|
### Using `influxasync` sink for communication with cc-metric-store
|
||||||
|
|
||||||
|
The cc-metric-store only accepts metrics with a timestamp precision in seconds, so it is required to use `"precision": "s"`.
|
@ -59,6 +59,8 @@ type InfluxSink struct {
|
|||||||
InfluxMaxRetryTime string `json:"max_retry_time,omitempty"`
|
InfluxMaxRetryTime string `json:"max_retry_time,omitempty"`
|
||||||
// Specify whether to use GZip compression in write requests
|
// Specify whether to use GZip compression in write requests
|
||||||
InfluxUseGzip bool `json:"use_gzip"`
|
InfluxUseGzip bool `json:"use_gzip"`
|
||||||
|
// Timestamp precision
|
||||||
|
Precision string `json:"precision,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// influx line protocol encoder
|
// influx line protocol encoder
|
||||||
@ -207,7 +209,20 @@ func (s *InfluxSink) connect() error {
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Set time precision
|
// Set time precision
|
||||||
clientOptions.SetPrecision(time.Nanosecond)
|
precision := time.Second
|
||||||
|
if len(s.config.Precision) > 0 {
|
||||||
|
switch s.config.Precision {
|
||||||
|
case "s":
|
||||||
|
precision = time.Second
|
||||||
|
case "ms":
|
||||||
|
precision = time.Millisecond
|
||||||
|
case "us":
|
||||||
|
precision = time.Microsecond
|
||||||
|
case "ns":
|
||||||
|
precision = time.Nanosecond
|
||||||
|
}
|
||||||
|
}
|
||||||
|
clientOptions.SetPrecision(precision)
|
||||||
|
|
||||||
// Create new writeAPI
|
// Create new writeAPI
|
||||||
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
|
s.client = influxdb2.NewClientWithOptions(uri, auth, clientOptions)
|
||||||
@ -421,6 +436,7 @@ func NewInfluxSink(name string, config json.RawMessage) (Sink, error) {
|
|||||||
// Set config default values
|
// Set config default values
|
||||||
s.config.BatchSize = 1000
|
s.config.BatchSize = 1000
|
||||||
s.config.FlushInterval = "1s"
|
s.config.FlushInterval = "1s"
|
||||||
|
s.config.Precision = "s"
|
||||||
|
|
||||||
// Read config
|
// Read config
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
|
@ -17,14 +17,17 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de
|
|||||||
"ssl": true,
|
"ssl": true,
|
||||||
"flush_delay" : "1s",
|
"flush_delay" : "1s",
|
||||||
"batch_size" : 1000,
|
"batch_size" : 1000,
|
||||||
"use_gzip": true
|
"use_gzip": true,
|
||||||
"meta_as_tags" : [],
|
"precision": "s",
|
||||||
|
"process_messages" : {
|
||||||
|
"see" : "docs of message processor for valid fields"
|
||||||
|
},
|
||||||
|
"meta_as_tags" : []
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
- `type`: makes the sink an `influxdb` sink
|
- `type`: makes the sink an `influxdb` sink
|
||||||
- `meta_as_tags`: print all meta information as tags in the output (optional)
|
|
||||||
- `database`: All metrics are written to this bucket
|
- `database`: All metrics are written to this bucket
|
||||||
- `host`: Hostname of the InfluxDB database server
|
- `host`: Hostname of the InfluxDB database server
|
||||||
- `port`: Port number (as string) of the InfluxDB database server
|
- `port`: Port number (as string) of the InfluxDB database server
|
||||||
@ -34,6 +37,9 @@ The `influxdb` sink uses the official [InfluxDB golang client](https://pkg.go.de
|
|||||||
- `ssl`: Use SSL connection
|
- `ssl`: Use SSL connection
|
||||||
- `flush_delay`: Group metrics coming in to a single batch
|
- `flush_delay`: Group metrics coming in to a single batch
|
||||||
- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay
|
- `batch_size`: Maximal batch size. If `batch_size` is reached before the end of `flush_delay`, the metrics are sent without further delay
|
||||||
|
- `precision`: Precision of the timestamp. Valid values are 's', 'ms', 'us' and 'ns'. (default is 's')
|
||||||
|
- `process_messages`: Process messages with given rules before progressing or dropping, see [here](../pkg/messageProcessor/README.md) (optional)
|
||||||
|
- `meta_as_tags`: print all meta information as tags in the output (deprecated, optional)
|
||||||
|
|
||||||
Influx client options:
|
Influx client options:
|
||||||
=======
|
=======
|
||||||
@ -46,3 +52,7 @@ Influx client options:
|
|||||||
- `max_retries`: maximum count of retry attempts of failed writes
|
- `max_retries`: maximum count of retry attempts of failed writes
|
||||||
- `max_retry_time`: maximum total retry timeout
|
- `max_retry_time`: maximum total retry timeout
|
||||||
- `use_gzip`: Specify whether to use GZip compression in write requests
|
- `use_gzip`: Specify whether to use GZip compression in write requests
|
||||||
|
|
||||||
|
### Using `influxdb` sink for communication with cc-metric-store
|
||||||
|
|
||||||
|
The cc-metric-store only accepts metrics with a timestamp precision in seconds, so it is required to use `"precision": "s"`.
|
@ -12,8 +12,9 @@ import (
|
|||||||
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
|
||||||
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
|
||||||
mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
|
mp "github.com/ClusterCockpit/cc-metric-collector/pkg/messageProcessor"
|
||||||
influx "github.com/influxdata/line-protocol"
|
influx "github.com/influxdata/line-protocol/v2/lineprotocol"
|
||||||
nats "github.com/nats-io/nats.go"
|
nats "github.com/nats-io/nats.go"
|
||||||
|
"golang.org/x/exp/slices"
|
||||||
)
|
)
|
||||||
|
|
||||||
type NatsSinkConfig struct {
|
type NatsSinkConfig struct {
|
||||||
@ -25,18 +26,23 @@ type NatsSinkConfig struct {
|
|||||||
Password string `json:"password,omitempty"`
|
Password string `json:"password,omitempty"`
|
||||||
FlushDelay string `json:"flush_delay,omitempty"`
|
FlushDelay string `json:"flush_delay,omitempty"`
|
||||||
NkeyFile string `json:"nkey_file,omitempty"`
|
NkeyFile string `json:"nkey_file,omitempty"`
|
||||||
|
// Timestamp precision
|
||||||
|
Precision string `json:"precision,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
type NatsSink struct {
|
type NatsSink struct {
|
||||||
sink
|
sink
|
||||||
client *nats.Conn
|
client *nats.Conn
|
||||||
encoder *influx.Encoder
|
encoder influx.Encoder
|
||||||
buffer *bytes.Buffer
|
buffer *bytes.Buffer
|
||||||
config NatsSinkConfig
|
config NatsSinkConfig
|
||||||
|
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
flushDelay time.Duration
|
flushDelay time.Duration
|
||||||
flushTimer *time.Timer
|
flushTimer *time.Timer
|
||||||
|
|
||||||
|
extended_tag_list []key_value_pair
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *NatsSink) connect() error {
|
func (s *NatsSink) connect() error {
|
||||||
@ -73,7 +79,52 @@ func (s *NatsSink) Write(m lp.CCMessage) error {
|
|||||||
msg, err := s.mp.ProcessMessage(m)
|
msg, err := s.mp.ProcessMessage(m)
|
||||||
if err == nil && msg != nil {
|
if err == nil && msg != nil {
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
_, err := s.encoder.Encode(msg.ToPoint(nil))
|
// Encode measurement name
|
||||||
|
s.encoder.StartLine(msg.Name())
|
||||||
|
|
||||||
|
// copy tags and meta data which should be used as tags
|
||||||
|
s.extended_tag_list = s.extended_tag_list[:0]
|
||||||
|
for key, value := range m.Tags() {
|
||||||
|
s.extended_tag_list =
|
||||||
|
append(
|
||||||
|
s.extended_tag_list,
|
||||||
|
key_value_pair{
|
||||||
|
key: key,
|
||||||
|
value: value,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
// Encode tags (they musts be in lexical order)
|
||||||
|
slices.SortFunc(
|
||||||
|
s.extended_tag_list,
|
||||||
|
func(a key_value_pair, b key_value_pair) int {
|
||||||
|
if a.key < b.key {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
if a.key > b.key {
|
||||||
|
return +1
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
},
|
||||||
|
)
|
||||||
|
for i := range s.extended_tag_list {
|
||||||
|
s.encoder.AddTag(
|
||||||
|
s.extended_tag_list[i].key,
|
||||||
|
s.extended_tag_list[i].value,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encode fields
|
||||||
|
for key, value := range msg.Fields() {
|
||||||
|
s.encoder.AddField(key, influx.MustNewValue(value))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encode time stamp
|
||||||
|
s.encoder.EndLine(msg.Time())
|
||||||
|
|
||||||
|
// Check for encoder errors
|
||||||
|
err := s.encoder.Err()
|
||||||
|
|
||||||
s.lock.Unlock()
|
s.lock.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cclog.ComponentError(s.name, "Write:", err.Error())
|
cclog.ComponentError(s.name, "Write:", err.Error())
|
||||||
@ -96,8 +147,8 @@ func (s *NatsSink) Write(m lp.CCMessage) error {
|
|||||||
|
|
||||||
func (s *NatsSink) Flush() error {
|
func (s *NatsSink) Flush() error {
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
buf := append([]byte{}, s.buffer.Bytes()...) // copy bytes
|
buf := slices.Clone(s.encoder.Bytes())
|
||||||
s.buffer.Reset()
|
s.encoder.Reset()
|
||||||
s.lock.Unlock()
|
s.lock.Unlock()
|
||||||
|
|
||||||
if len(buf) == 0 {
|
if len(buf) == 0 {
|
||||||
@ -120,6 +171,7 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
|
|||||||
s.name = fmt.Sprintf("NatsSink(%s)", name)
|
s.name = fmt.Sprintf("NatsSink(%s)", name)
|
||||||
s.flushDelay = 10 * time.Second
|
s.flushDelay = 10 * time.Second
|
||||||
s.config.Port = "4222"
|
s.config.Port = "4222"
|
||||||
|
s.config.Precision = "s"
|
||||||
if len(config) > 0 {
|
if len(config) > 0 {
|
||||||
d := json.NewDecoder(bytes.NewReader(config))
|
d := json.NewDecoder(bytes.NewReader(config))
|
||||||
d.DisallowUnknownFields()
|
d.DisallowUnknownFields()
|
||||||
@ -148,16 +200,26 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
|
|||||||
for _, k := range s.config.MetaAsTags {
|
for _, k := range s.config.MetaAsTags {
|
||||||
s.mp.AddMoveMetaToTags("true", k, k)
|
s.mp.AddMoveMetaToTags("true", k, k)
|
||||||
}
|
}
|
||||||
|
precision := influx.Second
|
||||||
|
if len(s.config.Precision) > 0 {
|
||||||
|
switch s.config.Precision {
|
||||||
|
case "s":
|
||||||
|
precision = influx.Second
|
||||||
|
case "ms":
|
||||||
|
precision = influx.Millisecond
|
||||||
|
case "us":
|
||||||
|
precision = influx.Microsecond
|
||||||
|
case "ns":
|
||||||
|
precision = influx.Nanosecond
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// s.meta_as_tags = make(map[string]bool)
|
// s.meta_as_tags = make(map[string]bool)
|
||||||
// for _, k := range s.config.MetaAsTags {
|
// for _, k := range s.config.MetaAsTags {
|
||||||
// s.meta_as_tags[k] = true
|
// s.meta_as_tags[k] = true
|
||||||
// }
|
// }
|
||||||
// Setup Influx line protocol
|
// Setup Influx line protocol
|
||||||
s.buffer = &bytes.Buffer{}
|
s.encoder.SetPrecision(precision)
|
||||||
s.buffer.Grow(1025)
|
|
||||||
s.encoder = influx.NewEncoder(s.buffer)
|
|
||||||
s.encoder.SetPrecision(time.Second)
|
|
||||||
s.encoder.SetMaxLineBytes(1024)
|
|
||||||
// Setup infos for connection
|
// Setup infos for connection
|
||||||
if err := s.connect(); err != nil {
|
if err := s.connect(); err != nil {
|
||||||
return nil, fmt.Errorf("unable to connect: %v", err)
|
return nil, fmt.Errorf("unable to connect: %v", err)
|
||||||
@ -171,6 +233,7 @@ func NewNatsSink(name string, config json.RawMessage) (Sink, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
s.extended_tag_list = make([]key_value_pair, 0)
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,12 @@ The `nats` sink publishes all metrics into a NATS network. The publishing key is
|
|||||||
"user": "exampleuser",
|
"user": "exampleuser",
|
||||||
"password" : "examplepw",
|
"password" : "examplepw",
|
||||||
"nkey_file": "/path/to/nkey_file",
|
"nkey_file": "/path/to/nkey_file",
|
||||||
"meta_as_tags" : [],
|
"flush_delay": "10s",
|
||||||
|
"precision": "s",
|
||||||
|
"process_messages" : {
|
||||||
|
"see" : "docs of message processor for valid fields"
|
||||||
|
},
|
||||||
|
"meta_as_tags" : []
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
@ -25,5 +30,12 @@ The `nats` sink publishes all metrics into a NATS network. The publishing key is
|
|||||||
- `port`: Port number (as string) of the NATS server
|
- `port`: Port number (as string) of the NATS server
|
||||||
- `user`: Username for basic authentication
|
- `user`: Username for basic authentication
|
||||||
- `password`: Password for basic authentication
|
- `password`: Password for basic authentication
|
||||||
- `meta_as_tags`: print all meta information as tags in the output (optional)
|
|
||||||
- `nkey_file`: Path to credentials file with NKEY
|
- `nkey_file`: Path to credentials file with NKEY
|
||||||
|
- `flush_delay`: Maximum time until metrics are sent out
|
||||||
|
- `precision`: Precision of the timestamp. Valid values are 's', 'ms', 'us' and 'ns'. (default is 's')
|
||||||
|
- `process_messages`: Process messages with given rules before progressing or dropping, see [here](../pkg/messageProcessor/README.md) (optional)
|
||||||
|
- `meta_as_tags`: print all meta information as tags in the output (deprecated, optional)
|
||||||
|
|
||||||
|
### Using `nats` sink for communication with cc-metric-store
|
||||||
|
|
||||||
|
The cc-metric-store only accepts metrics with a timestamp precision in seconds, so it is required to use `"precision": "s"`.
|
Loading…
x
Reference in New Issue
Block a user