From 13c206b11e5aac5a1ff6b098cc5b998bbbdf430e Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Fri, 20 Aug 2021 10:43:57 +0200 Subject: [PATCH] Implement nats receiver for metric data --- fileStore.go | 6 +- lineprotocol/lineprotocol.go | 20 +++---- lineprotocol/lineprotocol_test.go | 8 +-- lineprotocol/receiver.go | 98 +++++++++++++++++++++++++++++++ 4 files changed, 114 insertions(+), 18 deletions(-) create mode 100644 lineprotocol/receiver.go diff --git a/fileStore.go b/fileStore.go index cf0a29d..cb0aaf6 100644 --- a/fileStore.go +++ b/fileStore.go @@ -1,9 +1,6 @@ package main -import ( - "log" - "os" -) +/* //MetricFile holds the state for a metric store file. //It does not export any variable. @@ -63,3 +60,4 @@ func getFileHandle(file string, start int64) (f *File, err error) { return f } +*/ diff --git a/lineprotocol/lineprotocol.go b/lineprotocol/lineprotocol.go index 6987ffb..97ad8d9 100644 --- a/lineprotocol/lineprotocol.go +++ b/lineprotocol/lineprotocol.go @@ -17,10 +17,10 @@ type Metric struct { // fields: metrics... // t: timestamp (accuracy: seconds) type Line struct { - measurement string - tags map[string]string - fields []Metric - t time.Time + Measurement string + Tags map[string]string + Fields []Metric + Ts time.Time } // Parse a single line as string. @@ -35,18 +35,18 @@ func Parse(rawline string) (*Line, error) { } tagsAndMeasurement := strings.Split(parts[0], ",") - line.measurement = tagsAndMeasurement[0] - line.tags = map[string]string{} + line.Measurement = tagsAndMeasurement[0] + line.Tags = map[string]string{} for i := 1; i < len(tagsAndMeasurement); i++ { pair := strings.Split(tagsAndMeasurement[i], "=") if len(pair) != 2 { return nil, errors.New("line format error") } - line.tags[pair[0]] = pair[1] + line.Tags[pair[0]] = pair[1] } rawfields := strings.Split(parts[1], ",") - line.fields = []Metric{} + line.Fields = []Metric{} for i := 0; i < len(rawfields); i++ { pair := strings.Split(rawfields[i], "=") if len(pair) != 2 { @@ -57,7 +57,7 @@ func Parse(rawline string) (*Line, error) { return nil, err } - line.fields = append(line.fields, Metric{ + line.Fields = append(line.Fields, Metric{ Name: pair[0], Value: field, }) @@ -68,6 +68,6 @@ func Parse(rawline string) (*Line, error) { return nil, err } - line.t = time.Unix(unixTimestamp, 0) + line.Ts = time.Unix(unixTimestamp, 0) return line, nil } diff --git a/lineprotocol/lineprotocol_test.go b/lineprotocol/lineprotocol_test.go index 1f17cb6..8cc95cb 100644 --- a/lineprotocol/lineprotocol_test.go +++ b/lineprotocol/lineprotocol_test.go @@ -46,19 +46,19 @@ func TestParse(t *testing.T) { t.Error(err) } - if line.measurement != expectedMeasurement { + if line.Measurement != expectedMeasurement { t.Error("measurement not as expected") } - if line.t.Unix() != int64(expectedTimestamp) { + if line.Ts.Unix() != int64(expectedTimestamp) { t.Error("timestamp not as expected") } - if !reflect.DeepEqual(line.tags, expectedTags) { + if !reflect.DeepEqual(line.Tags, expectedTags) { t.Error("tags not as expected") } - if !reflect.DeepEqual(line.fields, expectedFields) { + if !reflect.DeepEqual(line.Fields, expectedFields) { t.Error("fields not as expected") } } diff --git a/lineprotocol/receiver.go b/lineprotocol/receiver.go new file mode 100644 index 0000000..041292f --- /dev/null +++ b/lineprotocol/receiver.go @@ -0,0 +1,98 @@ +package lineprotocol + +import ( + "bufio" + "io" + "log" + "net" + + nats "github.com/nats-io/nats.go" +) + +// Listen for connections sending metric data in the line protocol format. +// +// This is a blocking function, send `true` through the channel argument to shut down the server. +// `handleLine` will be called from different go routines for different connections. +// +func ReceiveTCP(address string, handleLine func(line *Line), done chan bool) error { + ln, err := net.Listen("tcp", address) + if err != nil { + return err + } + + handleConnection := func(conn net.Conn, handleLine func(line *Line)) { + reader := bufio.NewReader(conn) + for { + rawline, err := reader.ReadString('\n') + if err == io.EOF { + return + } + + if err != nil { + log.Printf("reading from connection failed: %s\n", err.Error()) + return + } + + line, err := Parse(rawline) + if err != nil { + log.Printf("parsing line failed: %s\n", err.Error()) + return + } + + handleLine(line) + } + } + + go func() { + for { + stop := <-done + if stop { + err := ln.Close() + if err != nil { + log.Printf("closing listener failed: %s\n", err.Error()) + } + return + } + } + }() + + for { + conn, err := ln.Accept() + if err != nil { + return err + } + + go handleConnection(conn, handleLine) + } +} + +// Connect to a nats server and subscribe to "updates". This is a blocking +// function. handleLine will be called for each line recieved via nats. +// Send `true` through the done channel for gracefull termination. +func ReceiveNats(address string, handleLine func(line *Line), done chan bool) error { + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + return err + } + defer nc.Close() + + // Subscribe + if _, err := nc.Subscribe("updates", func(m *nats.Msg) { + line, err := Parse(string(m.Data)) + if err != nil { + log.Printf("parsing line failed: %s\n", err.Error()) + return + } + + handleLine(line) + }); err != nil { + return err + } + + for { + stop := <-done + if stop { + return nil + } + } +}