From b1e6b8e37900eca3fdd5a22c4dcc486c6ac64464 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Tue, 18 May 2021 15:15:26 +0200 Subject: [PATCH] Add Nats receiver --- receivers/metricReceiver.go | 45 +++++++++++++++++++++++ receivers/natsReceiver.go | 72 +++++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 receivers/metricReceiver.go create mode 100644 receivers/natsReceiver.go diff --git a/receivers/metricReceiver.go b/receivers/metricReceiver.go new file mode 100644 index 0000000..8ceae10 --- /dev/null +++ b/receivers/metricReceiver.go @@ -0,0 +1,45 @@ +package receivers + +import ( + // "time" + s "github.com/ClusterCockpit/cc-metric-collector/sinks" + influx "github.com/influxdata/line-protocol" +) + +type ReceiverConfig struct { + Addr string `json:"host"` + Port string `json:"port"` + Database string `json:"database"` + Type string `json:"type"` +} + +type Receiver struct { + name string + addr string + port string + database string + organization string + sink s.SinkFuncs +} + +type ReceiverFuncs interface { + Init(config ReceiverConfig, sink s.SinkFuncs) error + Start() + Close() +} + +func Tags2Map(metric influx.Metric) map[string]string { + tags := make(map[string]string) + for _, t := range metric.TagList() { + tags[t.Key] = t.Value + } + return tags +} + +func Fields2Map(metric influx.Metric) map[string]interface{} { + fields := make(map[string]interface{}) + for _, f := range metric.FieldList() { + fields[f.Key] = f.Value + } + return fields +} diff --git a/receivers/natsReceiver.go b/receivers/natsReceiver.go new file mode 100644 index 0000000..43113b7 --- /dev/null +++ b/receivers/natsReceiver.go @@ -0,0 +1,72 @@ +package receivers + +import ( + s "github.com/ClusterCockpit/cc-metric-collector/sinks" + protocol "github.com/influxdata/line-protocol" + nats "github.com/nats-io/nats.go" + "log" + "time" + "errors" +) + +type NatsReceiver struct { + Receiver + nc *nats.Conn +} + +var DefaultTime = func() time.Time { + return time.Unix(42, 0) +} + +func (r *NatsReceiver) Init(config ReceiverConfig, sink s.SinkFuncs) error { + if len(config.Addr) == 0 || + len(config.Port) == 0 || + len(config.Database) == 0 { + return errors.New("Not all configuration variables set required by NatsReceiver") + } + r.addr = config.Addr + if len(r.addr) == 0 { + r.addr = nats.DefaultURL + } + r.port = config.Port + if len(r.port) == 0 { + r.port = "4222" + } + log.Print("Init NATS Receiver") + nc, err := nats.Connect(r.addr) + if err == nil { + r.database = config.Database + r.sink = sink + r.nc = nc + } else { + log.Print(err) + r.nc = nil + } + return err +} + +func (r *NatsReceiver) Start() { + log.Print("Start NATS Receiver") + r.nc.Subscribe(r.database, r._NatsReceive) +} + +func (r *NatsReceiver) _NatsReceive(m *nats.Msg) { + handler := protocol.NewMetricHandler() + parser := protocol.NewParser(handler) + parser.SetTimeFunc(DefaultTime) + metrics, err := parser.Parse(m.Data) + if err == nil { + for _, m := range metrics { + tags := Tags2Map(m) + fields := Fields2Map(m) + r.sink.Write(m.Name(), tags, fields, m.Time()) + } + } +} + +func (r *NatsReceiver) Close() { + if r.nc != nil { + log.Print("Close NATS Receiver") + r.nc.Close() + } +}