From 3f4b11db47f0062497943be5b13bb42ab41cd71e Mon Sep 17 00:00:00 2001 From: Holger Obermaier <40787752+ho-ob@users.noreply.github.com> Date: Mon, 18 Sep 2023 16:03:57 +0200 Subject: [PATCH] github.com/influxdata/line-protocol -> github.com/influxdata/line-protocol/v2/lineprotocol --- receivers/natsReceiver.go | 83 +++++++++++++++++++++++++++++---------- 1 file changed, 62 insertions(+), 21 deletions(-) diff --git a/receivers/natsReceiver.go b/receivers/natsReceiver.go index e9cb961..ea0cc3b 100644 --- a/receivers/natsReceiver.go +++ b/receivers/natsReceiver.go @@ -8,7 +8,7 @@ import ( cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" - influx "github.com/influxdata/line-protocol" + influx "github.com/influxdata/line-protocol/v2/lineprotocol" nats "github.com/nats-io/nats.go" ) @@ -21,15 +21,9 @@ type NatsReceiverConfig struct { type NatsReceiver struct { receiver - nc *nats.Conn - handler *influx.MetricHandler - parser *influx.Parser - meta map[string]string - config NatsReceiverConfig -} - -var DefaultTime = func() time.Time { - return time.Unix(42, 0) + nc *nats.Conn + meta map[string]string + config NatsReceiverConfig } // Start subscribes to the configured NATS subject @@ -41,16 +35,66 @@ func (r *NatsReceiver) Start() { // _NatsReceive receives subscribed messages from the NATS server func (r *NatsReceiver) _NatsReceive(m *nats.Msg) { - metrics, err := r.parser.Parse(m.Data) - if err == nil { - for _, m := range metrics { - y := lp.FromInfluxMetric(m) - for k, v := range r.meta { - y.AddMeta(k, v) + + d := influx.NewDecoderWithBytes(m.Data) + for d.Next() { + + // Decode measurement name + measurement, err := d.Measurement() + if err != nil { + msg := "_NatsReceive: Failed to decode measurement: " + err.Error() + cclog.ComponentError(r.name, msg) + return + } + + // Decode tags + tags := make(map[string]string) + for { + key, value, err := d.NextTag() + if err != nil { + msg := "_NatsReceive: Failed to decode tag: " + err.Error() + cclog.ComponentError(r.name, msg) + return } - if r.sink != nil { - r.sink <- y + if key == nil { + break } + tags[string(key)] = string(value) + } + + // Decode fields + fields := make(map[string]interface{}) + for { + key, value, err := d.NextField() + if err != nil { + msg := "_NatsReceive: Failed to decode field: " + err.Error() + cclog.ComponentError(r.name, msg) + return + } + if key == nil { + break + } + fields[string(key)] = value.Interface() + } + + // Decode time stamp + t, err := d.Time(influx.Nanosecond, time.Time{}) + if err != nil { + msg := "_NatsReceive: Failed to decode time: " + err.Error() + cclog.ComponentError(r.name, msg) + return + } + + y, _ := lp.New( + string(measurement), + tags, + r.meta, + fields, + t, + ) + + if r.sink != nil { + r.sink <- y } } } @@ -99,8 +143,5 @@ func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) { return nil, err } - r.handler = influx.NewMetricHandler() - r.parser = influx.NewParser(r.handler) - r.parser.SetTimeFunc(DefaultTime) return r, nil }