Support more than one line per message

This commit is contained in:
Lou Knauer 2021-09-13 12:26:55 +02:00
parent 53d5734fd5
commit 4d17abdbc8
2 changed files with 32 additions and 14 deletions

View File

@ -33,7 +33,7 @@ func (f Float) MarshalJSON() ([]byte, error) {
return []byte("null"), nil return []byte("null"), nil
} }
return []byte(strconv.FormatFloat(float64(f), 'f', -1, 64)), nil return []byte(strconv.FormatFloat(float64(f), 'f', 2, 64)), nil
} }
func (f *Float) UnmarshalJSON(input []byte) error { func (f *Float) UnmarshalJSON(input []byte) error {
@ -71,7 +71,7 @@ type Line struct {
// //
// There is performance to be gained by implementing a parser // There is performance to be gained by implementing a parser
// that directly reads from a bufio.Scanner. // that directly reads from a bufio.Scanner.
func Parse(rawline string) (*Line, error) { func ParseLine(rawline string) (*Line, error) {
line := &Line{} line := &Line{}
parts := strings.Fields(rawline) parts := strings.Fields(rawline)
if len(parts) != 3 { if len(parts) != 3 {
@ -116,6 +116,22 @@ func Parse(rawline string) (*Line, error) {
return line, nil return line, nil
} }
func ParseLines(raw string) ([]*Line, error) {
lines := make([]*Line, 0, 1)
for _, line := range strings.Split(raw, "\n") {
if len(line) == 0 {
continue
}
line, err := ParseLine(line)
if err != nil {
return nil, err
}
lines = append(lines, line)
}
return lines, nil
}
// Listen for connections sending metric data in the line protocol format. // Listen for connections sending metric data in the line protocol format.
// //
// This is a blocking function, send `true` through the channel argument to shut down the server. // This is a blocking function, send `true` through the channel argument to shut down the server.
@ -140,7 +156,7 @@ func ReceiveTCP(address string, handleLine func(line *Line), done chan bool) err
return return
} }
line, err := Parse(rawline) line, err := ParseLine(rawline)
if err != nil { if err != nil {
log.Printf("parsing line failed: %s\n", err.Error()) log.Printf("parsing line failed: %s\n", err.Error())
return return
@ -187,13 +203,14 @@ func ReceiveNats(address string, handleLine func(line *Line), workers int, ctx c
if workers < 2 { if workers < 2 {
sub, err = nc.Subscribe("updates", func(m *nats.Msg) { sub, err = nc.Subscribe("updates", func(m *nats.Msg) {
line, err := Parse(string(m.Data)) lines, err := ParseLines(string(m.Data))
if err != nil { if err != nil {
log.Printf("parsing line failed: %s\n", err.Error()) log.Println(err.Error())
return
} }
handleLine(line) for _, line := range lines {
handleLine(line)
}
}) })
if err != nil { if err != nil {
return err return err
@ -210,14 +227,15 @@ func ReceiveNats(address string, handleLine func(line *Line), workers int, ctx c
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go func() { go func() {
for msg := range msgs { for m := range msgs {
line, err := Parse(string(msg.Data)) lines, err := ParseLines(string(m.Data))
if err != nil { if err != nil {
log.Printf("parsing line failed: %s\n", err.Error()) log.Println(err.Error())
return
} }
handleLine(line) for _, line := range lines {
handleLine(line)
}
} }
wg.Done() wg.Done()

View File

@ -43,7 +43,7 @@ var expectedFields = []Metric{
var expectedTimestamp int64 = 1629356936 var expectedTimestamp int64 = 1629356936
func TestParseLine(t *testing.T) { func TestParseLine(t *testing.T) {
line, err := Parse(raw) line, err := ParseLine(raw)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -80,7 +80,7 @@ func BenchmarkParseLine(b *testing.B) {
} }
line := scanner.Text() line := scanner.Text()
_, err := Parse(line) _, err := ParseLine(line)
if err != nil { if err != nil {
b.Error(err) b.Error(err)
return return