github.com/influxdata/line-protocol -> github.com/influxdata/line-protocol/v2/lineprotocol

This commit is contained in:
Holger Obermaier 2023-09-18 16:03:57 +02:00
parent fd227ed8b3
commit 3f4b11db47

View File

@ -8,7 +8,7 @@ import (
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
influx "github.com/influxdata/line-protocol" influx "github.com/influxdata/line-protocol/v2/lineprotocol"
nats "github.com/nats-io/nats.go" nats "github.com/nats-io/nats.go"
) )
@ -22,16 +22,10 @@ type NatsReceiverConfig struct {
type NatsReceiver struct { type NatsReceiver struct {
receiver receiver
nc *nats.Conn nc *nats.Conn
handler *influx.MetricHandler
parser *influx.Parser
meta map[string]string meta map[string]string
config NatsReceiverConfig config NatsReceiverConfig
} }
var DefaultTime = func() time.Time {
return time.Unix(42, 0)
}
// Start subscribes to the configured NATS subject // Start subscribes to the configured NATS subject
// Messages wil be handled by r._NatsReceive // Messages wil be handled by r._NatsReceive
func (r *NatsReceiver) Start() { func (r *NatsReceiver) Start() {
@ -41,19 +35,69 @@ func (r *NatsReceiver) Start() {
// _NatsReceive receives subscribed messages from the NATS server // _NatsReceive receives subscribed messages from the NATS server
func (r *NatsReceiver) _NatsReceive(m *nats.Msg) { func (r *NatsReceiver) _NatsReceive(m *nats.Msg) {
metrics, err := r.parser.Parse(m.Data)
if err == nil { d := influx.NewDecoderWithBytes(m.Data)
for _, m := range metrics { for d.Next() {
y := lp.FromInfluxMetric(m)
for k, v := range r.meta { // Decode measurement name
y.AddMeta(k, v) measurement, err := d.Measurement()
if err != nil {
msg := "_NatsReceive: Failed to decode measurement: " + err.Error()
cclog.ComponentError(r.name, msg)
return
} }
// Decode tags
tags := make(map[string]string)
for {
key, value, err := d.NextTag()
if err != nil {
msg := "_NatsReceive: Failed to decode tag: " + err.Error()
cclog.ComponentError(r.name, msg)
return
}
if key == nil {
break
}
tags[string(key)] = string(value)
}
// Decode fields
fields := make(map[string]interface{})
for {
key, value, err := d.NextField()
if err != nil {
msg := "_NatsReceive: Failed to decode field: " + err.Error()
cclog.ComponentError(r.name, msg)
return
}
if key == nil {
break
}
fields[string(key)] = value.Interface()
}
// Decode time stamp
t, err := d.Time(influx.Nanosecond, time.Time{})
if err != nil {
msg := "_NatsReceive: Failed to decode time: " + err.Error()
cclog.ComponentError(r.name, msg)
return
}
y, _ := lp.New(
string(measurement),
tags,
r.meta,
fields,
t,
)
if r.sink != nil { if r.sink != nil {
r.sink <- y r.sink <- y
} }
} }
} }
}
// Close closes the connection to the NATS server // Close closes the connection to the NATS server
func (r *NatsReceiver) Close() { func (r *NatsReceiver) Close() {
@ -99,8 +143,5 @@ func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error) {
return nil, err return nil, err
} }
r.handler = influx.NewMetricHandler()
r.parser = influx.NewParser(r.handler)
r.parser.SetTimeFunc(DefaultTime)
return r, nil return r, nil
} }