Implement nats receiver for metric data

This commit is contained in:
Lou Knauer 2021-08-20 10:43:57 +02:00
parent ee47364474
commit 13c206b11e
4 changed files with 114 additions and 18 deletions

View File

@ -1,9 +1,6 @@
package main package main
import ( /*
"log"
"os"
)
//MetricFile holds the state for a metric store file. //MetricFile holds the state for a metric store file.
//It does not export any variable. //It does not export any variable.
@ -63,3 +60,4 @@ func getFileHandle(file string, start int64) (f *File, err error) {
return f return f
} }
*/

View File

@ -17,10 +17,10 @@ type Metric struct {
// fields: metrics... // fields: metrics...
// t: timestamp (accuracy: seconds) // t: timestamp (accuracy: seconds)
type Line struct { type Line struct {
measurement string Measurement string
tags map[string]string Tags map[string]string
fields []Metric Fields []Metric
t time.Time Ts time.Time
} }
// Parse a single line as string. // Parse a single line as string.
@ -35,18 +35,18 @@ func Parse(rawline string) (*Line, error) {
} }
tagsAndMeasurement := strings.Split(parts[0], ",") tagsAndMeasurement := strings.Split(parts[0], ",")
line.measurement = tagsAndMeasurement[0] line.Measurement = tagsAndMeasurement[0]
line.tags = map[string]string{} line.Tags = map[string]string{}
for i := 1; i < len(tagsAndMeasurement); i++ { for i := 1; i < len(tagsAndMeasurement); i++ {
pair := strings.Split(tagsAndMeasurement[i], "=") pair := strings.Split(tagsAndMeasurement[i], "=")
if len(pair) != 2 { if len(pair) != 2 {
return nil, errors.New("line format error") return nil, errors.New("line format error")
} }
line.tags[pair[0]] = pair[1] line.Tags[pair[0]] = pair[1]
} }
rawfields := strings.Split(parts[1], ",") rawfields := strings.Split(parts[1], ",")
line.fields = []Metric{} line.Fields = []Metric{}
for i := 0; i < len(rawfields); i++ { for i := 0; i < len(rawfields); i++ {
pair := strings.Split(rawfields[i], "=") pair := strings.Split(rawfields[i], "=")
if len(pair) != 2 { if len(pair) != 2 {
@ -57,7 +57,7 @@ func Parse(rawline string) (*Line, error) {
return nil, err return nil, err
} }
line.fields = append(line.fields, Metric{ line.Fields = append(line.Fields, Metric{
Name: pair[0], Name: pair[0],
Value: field, Value: field,
}) })
@ -68,6 +68,6 @@ func Parse(rawline string) (*Line, error) {
return nil, err return nil, err
} }
line.t = time.Unix(unixTimestamp, 0) line.Ts = time.Unix(unixTimestamp, 0)
return line, nil return line, nil
} }

View File

@ -46,19 +46,19 @@ func TestParse(t *testing.T) {
t.Error(err) t.Error(err)
} }
if line.measurement != expectedMeasurement { if line.Measurement != expectedMeasurement {
t.Error("measurement not as expected") t.Error("measurement not as expected")
} }
if line.t.Unix() != int64(expectedTimestamp) { if line.Ts.Unix() != int64(expectedTimestamp) {
t.Error("timestamp not as expected") t.Error("timestamp not as expected")
} }
if !reflect.DeepEqual(line.tags, expectedTags) { if !reflect.DeepEqual(line.Tags, expectedTags) {
t.Error("tags not as expected") t.Error("tags not as expected")
} }
if !reflect.DeepEqual(line.fields, expectedFields) { if !reflect.DeepEqual(line.Fields, expectedFields) {
t.Error("fields not as expected") t.Error("fields not as expected")
} }
} }

98
lineprotocol/receiver.go Normal file
View File

@ -0,0 +1,98 @@
package lineprotocol
import (
"bufio"
"io"
"log"
"net"
nats "github.com/nats-io/nats.go"
)
// Listen for connections sending metric data in the line protocol format.
//
// This is a blocking function, send `true` through the channel argument to shut down the server.
// `handleLine` will be called from different go routines for different connections.
//
func ReceiveTCP(address string, handleLine func(line *Line), done chan bool) error {
ln, err := net.Listen("tcp", address)
if err != nil {
return err
}
handleConnection := func(conn net.Conn, handleLine func(line *Line)) {
reader := bufio.NewReader(conn)
for {
rawline, err := reader.ReadString('\n')
if err == io.EOF {
return
}
if err != nil {
log.Printf("reading from connection failed: %s\n", err.Error())
return
}
line, err := Parse(rawline)
if err != nil {
log.Printf("parsing line failed: %s\n", err.Error())
return
}
handleLine(line)
}
}
go func() {
for {
stop := <-done
if stop {
err := ln.Close()
if err != nil {
log.Printf("closing listener failed: %s\n", err.Error())
}
return
}
}
}()
for {
conn, err := ln.Accept()
if err != nil {
return err
}
go handleConnection(conn, handleLine)
}
}
// Connect to a nats server and subscribe to "updates". This is a blocking
// function. handleLine will be called for each line recieved via nats.
// Send `true` through the done channel for gracefull termination.
func ReceiveNats(address string, handleLine func(line *Line), done chan bool) error {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
return err
}
defer nc.Close()
// Subscribe
if _, err := nc.Subscribe("updates", func(m *nats.Msg) {
line, err := Parse(string(m.Data))
if err != nil {
log.Printf("parsing line failed: %s\n", err.Error())
return
}
handleLine(line)
}); err != nil {
return err
}
for {
stop := <-done
if stop {
return nil
}
}
}