diff --git a/lineprotocol.go b/lineprotocol.go index dc0cad6..5b3e246 100644 --- a/lineprotocol.go +++ b/lineprotocol.go @@ -33,7 +33,7 @@ func (f Float) MarshalJSON() ([]byte, error) { return []byte("null"), nil } - return []byte(strconv.FormatFloat(float64(f), 'f', -1, 64)), nil + return []byte(strconv.FormatFloat(float64(f), 'f', 2, 64)), nil } func (f *Float) UnmarshalJSON(input []byte) error { @@ -71,7 +71,7 @@ type Line struct { // // There is performance to be gained by implementing a parser // that directly reads from a bufio.Scanner. -func Parse(rawline string) (*Line, error) { +func ParseLine(rawline string) (*Line, error) { line := &Line{} parts := strings.Fields(rawline) if len(parts) != 3 { @@ -116,6 +116,22 @@ func Parse(rawline string) (*Line, error) { return line, nil } +func ParseLines(raw string) ([]*Line, error) { + lines := make([]*Line, 0, 1) + for _, line := range strings.Split(raw, "\n") { + if len(line) == 0 { + continue + } + + line, err := ParseLine(line) + if err != nil { + return nil, err + } + lines = append(lines, line) + } + return lines, nil +} + // Listen for connections sending metric data in the line protocol format. // // This is a blocking function, send `true` through the channel argument to shut down the server. @@ -140,7 +156,7 @@ func ReceiveTCP(address string, handleLine func(line *Line), done chan bool) err return } - line, err := Parse(rawline) + line, err := ParseLine(rawline) if err != nil { log.Printf("parsing line failed: %s\n", err.Error()) return @@ -187,13 +203,14 @@ func ReceiveNats(address string, handleLine func(line *Line), workers int, ctx c if workers < 2 { sub, err = nc.Subscribe("updates", func(m *nats.Msg) { - line, err := Parse(string(m.Data)) + lines, err := ParseLines(string(m.Data)) if err != nil { - log.Printf("parsing line failed: %s\n", err.Error()) - return + log.Println(err.Error()) } - handleLine(line) + for _, line := range lines { + handleLine(line) + } }) if err != nil { return err @@ -210,14 +227,15 @@ func ReceiveNats(address string, handleLine func(line *Line), workers int, ctx c for i := 0; i < workers; i++ { go func() { - for msg := range msgs { - line, err := Parse(string(msg.Data)) + for m := range msgs { + lines, err := ParseLines(string(m.Data)) if err != nil { - log.Printf("parsing line failed: %s\n", err.Error()) - return + log.Println(err.Error()) } - handleLine(line) + for _, line := range lines { + handleLine(line) + } } wg.Done() diff --git a/lineprotocol_test.go b/lineprotocol_test.go index 518cb8f..d601020 100644 --- a/lineprotocol_test.go +++ b/lineprotocol_test.go @@ -43,7 +43,7 @@ var expectedFields = []Metric{ var expectedTimestamp int64 = 1629356936 func TestParseLine(t *testing.T) { - line, err := Parse(raw) + line, err := ParseLine(raw) if err != nil { t.Error(err) } @@ -80,7 +80,7 @@ func BenchmarkParseLine(b *testing.B) { } line := scanner.Text() - _, err := Parse(line) + _, err := ParseLine(line) if err != nil { b.Error(err) return