From 85591e7a03236c7f35561077439747442f6de8f9 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Thu, 7 Oct 2021 14:52:16 +0200 Subject: [PATCH] Switch to influxes line protocol parser --- go.mod | 3 +- go.sum | 19 +++++ lineprotocol.go | 174 ++++++++----------------------------------- lineprotocol_test.go | 89 ---------------------- 4 files changed, 50 insertions(+), 235 deletions(-) delete mode 100644 lineprotocol_test.go diff --git a/go.mod b/go.mod index 0c62c1d..791d58c 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,10 @@ module github.com/ClusterCockpit/cc-metric-store go 1.16 require ( - github.com/golang-jwt/jwt/v4 v4.0.0 // indirect + github.com/golang-jwt/jwt/v4 v4.0.0 github.com/golang/protobuf v1.5.2 // indirect github.com/gorilla/mux v1.8.0 + github.com/influxdata/line-protocol/v2 v2.2.0 github.com/nats-io/nats-server/v2 v2.2.6 // indirect github.com/nats-io/nats.go v1.11.0 ) diff --git a/go.sum b/go.sum index bcfc0ce..4b21e43 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= +github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= +github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU= github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang-jwt/jwt/v4 v4.0.0 h1:RAqyYixv1p7uEnocuy8P1nru5wprCh/MH2BIlW5z5/o= @@ -14,11 +18,22 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/influxdata/line-protocol-corpus v0.0.0-20210519164801-ca6fa5da0184/go.mod h1:03nmhxzZ7Xk2pdG+lmMd7mHDfeVOYFyhOgwO61qWU98= +github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937/go.mod h1:BKR9c0uHSmRgM/se9JhFHtTT7JTO67X23MtKMHtZcpo= +github.com/influxdata/line-protocol/v2 v2.0.0-20210312151457-c52fdecb625a/go.mod h1:6+9Xt5Sq1rWx+glMgxhcg2c0DUaehK+5TDcPZ76GypY= +github.com/influxdata/line-protocol/v2 v2.1.0/go.mod h1:QKw43hdUBg3GTk2iC3iyCxksNj7PX9aUSeYOYE/ceHY= +github.com/influxdata/line-protocol/v2 v2.2.0 h1:UPmAqE15Hw5zu9E10SYhoXVLWnEJkWnuCbaCiRsA3c0= +github.com/influxdata/line-protocol/v2 v2.2.0/go.mod h1:DmB3Cnh+3oxmG6LOBIxce4oaL4CPj3OmMPgvauXh+tM= github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk= github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= @@ -34,6 +49,7 @@ github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as= @@ -61,3 +77,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/lineprotocol.go b/lineprotocol.go index 05140d2..5fcdbee 100644 --- a/lineprotocol.go +++ b/lineprotocol.go @@ -3,16 +3,13 @@ package main import ( "bufio" "context" - "errors" - "io" "log" "math" "net" "strconv" - "strings" "sync" - "time" + "github.com/influxdata/line-protocol/v2/lineprotocol" "github.com/nats-io/nats.go" ) @@ -56,116 +53,17 @@ type Metric struct { Value Float } -// measurement: node or cpu -// tags: host, cluster, cpu (cpu only if measurement is cpu) -// fields: metrics... -// t: timestamp (accuracy: seconds) -type Line struct { - Measurement string - Tags map[string]string - Fields []Metric - Ts time.Time -} - -// Parse a single line as string. -// -// There is performance to be gained by implementing a parser -// that directly reads from a bufio.Scanner. -func ParseLine(rawline string) (*Line, error) { - line := &Line{} - parts := strings.Fields(rawline) - if len(parts) != 3 { - return nil, errors.New("line format error") - } - - tagsAndMeasurement := strings.Split(parts[0], ",") - line.Measurement = tagsAndMeasurement[0] - line.Tags = map[string]string{} - for i := 1; i < len(tagsAndMeasurement); i++ { - pair := strings.Split(tagsAndMeasurement[i], "=") - if len(pair) != 2 { - return nil, errors.New("line format error") - } - line.Tags[pair[0]] = pair[1] - } - - rawfields := strings.Split(parts[1], ",") - line.Fields = []Metric{} - for i := 0; i < len(rawfields); i++ { - pair := strings.Split(rawfields[i], "=") - if len(pair) != 2 { - return nil, errors.New("line format error") - } - field, err := strconv.ParseFloat(pair[1], 64) - if err != nil { - return nil, err - } - - line.Fields = append(line.Fields, Metric{ - Name: pair[0], - Value: Float(field), - }) - } - - unixTimestamp, err := strconv.ParseInt(parts[2], 10, 64) - if err != nil { - return nil, err - } - - line.Ts = time.Unix(unixTimestamp, 0) - return line, nil -} - -func ParseLines(raw string) ([]*Line, error) { - lines := make([]*Line, 0, 1) - for _, line := range strings.Split(raw, "\n") { - if len(line) == 0 { - continue - } - - line, err := ParseLine(line) - if err != nil { - return nil, err - } - lines = append(lines, line) - } - return lines, nil -} - // Listen for connections sending metric data in the line protocol format. // // This is a blocking function, send `true` through the channel argument to shut down the server. // `handleLine` will be called from different go routines for different connections. // -func ReceiveTCP(address string, handleLine func(line *Line), done chan bool) error { +func ReceiveTCP(address string, handleLine func(dec *lineprotocol.Decoder), done chan bool) error { ln, err := net.Listen("tcp", address) if err != nil { return err } - handleConnection := func(conn net.Conn, handleLine func(line *Line)) { - reader := bufio.NewReader(conn) - for { - rawline, err := reader.ReadString('\n') - if err == io.EOF { - return - } - - if err != nil { - log.Printf("reading from connection failed: %s\n", err.Error()) - return - } - - line, err := ParseLine(rawline) - if err != nil { - log.Printf("parsing line failed: %s\n", err.Error()) - return - } - - handleLine(line) - } - } - go func() { for { stop := <-done @@ -185,57 +83,37 @@ func ReceiveTCP(address string, handleLine func(line *Line), done chan bool) err return err } - go handleConnection(conn, handleLine) + go func() { + reader := bufio.NewReader(conn) + dec := lineprotocol.NewDecoder(reader) + handleLine(dec) + }() } } // Connect to a nats server and subscribe to "updates". This is a blocking // function. handleLine will be called for each line recieved via nats. // Send `true` through the done channel for gracefull termination. -func ReceiveNats(address string, handleLine func(line *Line), workers int, ctx context.Context) error { +func ReceiveNats(address string, handleLine func(dec *lineprotocol.Decoder), workers int, ctx context.Context) error { nc, err := nats.Connect(nats.DefaultURL) if err != nil { return err } defer nc.Close() + var wg sync.WaitGroup var sub *nats.Subscription - if workers < 2 { - sub, err = nc.Subscribe("updates", func(m *nats.Msg) { - lines, err := ParseLines(string(m.Data)) - if err != nil { - log.Println(err.Error()) - } + msgs := make(chan *nats.Msg, workers*2) - for _, line := range lines { - handleLine(line) - } - }) - if err != nil { - return err - } - - log.Printf("NATS subscription to 'updates' on '%s' established\n", address) - - <-ctx.Done() - err = sub.Unsubscribe() - } else { - msgs := make(chan *nats.Msg, 16) - var wg sync.WaitGroup + if workers > 1 { wg.Add(workers) for i := 0; i < workers; i++ { go func() { for m := range msgs { - lines, err := ParseLines(string(m.Data)) - if err != nil { - log.Println(err.Error()) - } - - for _, line := range lines { - handleLine(line) - } + dec := lineprotocol.NewDecoderWithBytes(m.Data) + handleLine(dec) } wg.Done() @@ -245,22 +123,28 @@ func ReceiveNats(address string, handleLine func(line *Line), workers int, ctx c sub, err = nc.Subscribe("updates", func(m *nats.Msg) { msgs <- m }) - if err != nil { - return err - } - - log.Printf("NATS subscription to 'updates' on '%s' established\n", address) - - <-ctx.Done() - err = sub.Unsubscribe() - close(msgs) - wg.Wait() + } else { + sub, err = nc.Subscribe("updates", func(m *nats.Msg) { + dec := lineprotocol.NewDecoderWithBytes(m.Data) + handleLine(dec) + }) } if err != nil { return err } + log.Printf("NATS subscription to 'updates' on '%s' established\n", address) + + <-ctx.Done() + err = sub.Unsubscribe() + close(msgs) + wg.Wait() + + if err != nil { + return err + } + nc.Close() log.Println("NATS connection closed") return nil diff --git a/lineprotocol_test.go b/lineprotocol_test.go deleted file mode 100644 index d601020..0000000 --- a/lineprotocol_test.go +++ /dev/null @@ -1,89 +0,0 @@ -package main - -import ( - "bufio" - "reflect" - "strings" - "testing" -) - -var raw = "node,host=lousxps,cluster=test mem_used=4692.252,proc_total=1083,load_five=0.91,cpu_user=1.424336e+06,cpu_guest_nice=0,cpu_guest=0,mem_available=9829.848,mem_slab=514.796,mem_free=4537.956,proc_run=2,cpu_idle=2.1589764e+07,swap_total=0,mem_cached=6368.5,swap_free=0,load_fifteen=0.93,cpu_nice=196,cpu_softirq=41456,mem_buffers=489.992,mem_total=16088.7,load_one=0.84,cpu_system=517223,cpu_iowait=8994,cpu_steal=0,cpu_irq=113265,mem_sreclaimable=362.452 1629356936\n" -var expectedMeasurement = `node` -var expectedTags = map[string]string{ - "host": "lousxps", - "cluster": "test", -} -var expectedFields = []Metric{ - {"mem_used", 4692.252}, - {"proc_total", 1083}, - {"load_five", 0.91}, - {"cpu_user", 1.424336e+06}, - {"cpu_guest_nice", 0}, - {"cpu_guest", 0}, - {"mem_available", 9829.848}, - {"mem_slab", 514.796}, - {"mem_free", 4537.956}, - {"proc_run", 2}, - {"cpu_idle", 2.1589764e+07}, - {"swap_total", 0}, - {"mem_cached", 6368.5}, - {"swap_free", 0}, - {"load_fifteen", 0.93}, - {"cpu_nice", 196}, - {"cpu_softirq", 41456}, - {"mem_buffers", 489.992}, - {"mem_total", 16088.7}, - {"load_one", 0.84}, - {"cpu_system", 517223}, - {"cpu_iowait", 8994}, - {"cpu_steal", 0}, - {"cpu_irq", 113265}, - {"mem_sreclaimable", 362.452}, -} -var expectedTimestamp int64 = 1629356936 - -func TestParseLine(t *testing.T) { - line, err := ParseLine(raw) - if err != nil { - t.Error(err) - } - - if line.Measurement != expectedMeasurement { - t.Error("measurement not as expected") - } - - if line.Ts.Unix() != int64(expectedTimestamp) { - t.Error("timestamp not as expected") - } - - if !reflect.DeepEqual(line.Tags, expectedTags) { - t.Error("tags not as expected") - } - - if !reflect.DeepEqual(line.Fields, expectedFields) { - t.Error("fields not as expected") - } -} - -func BenchmarkParseLine(b *testing.B) { - b.StopTimer() - lines := strings.Repeat(raw, b.N) - scanner := bufio.NewScanner(strings.NewReader(lines)) - scanner.Split(bufio.ScanLines) - - b.StartTimer() - for i := 0; i < b.N; i++ { - ok := scanner.Scan() - if !ok { - b.Error("woops") - return - } - - line := scanner.Text() - _, err := ParseLine(line) - if err != nil { - b.Error(err) - return - } - } -}