Switch to influxes line protocol parser

This commit is contained in:
Lou Knauer 2021-10-07 14:52:16 +02:00
parent 3b2ec98ba0
commit 85591e7a03
4 changed files with 50 additions and 235 deletions

View File

@ -3,9 +3,10 @@ module github.com/ClusterCockpit/cc-metric-store
go 1.16
require (
github.com/golang-jwt/jwt/v4 v4.0.0 // indirect
github.com/golang-jwt/jwt/v4 v4.0.0
github.com/golang/protobuf v1.5.2 // indirect
github.com/gorilla/mux v1.8.0
github.com/influxdata/line-protocol/v2 v2.2.0
github.com/nats-io/nats-server/v2 v2.2.6 // indirect
github.com/nats-io/nats.go v1.11.0

View File

@ -1,3 +1,7 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU=
github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/golang-jwt/jwt/v4 v4.0.0 h1:RAqyYixv1p7uEnocuy8P1nru5wprCh/MH2BIlW5z5/o=
@ -14,11 +18,22 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/influxdata/line-protocol-corpus v0.0.0-20210519164801-ca6fa5da0184/go.mod h1:03nmhxzZ7Xk2pdG+lmMd7mHDfeVOYFyhOgwO61qWU98=
github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937/go.mod h1:BKR9c0uHSmRgM/se9JhFHtTT7JTO67X23MtKMHtZcpo=
github.com/influxdata/line-protocol/v2 v2.0.0-20210312151457-c52fdecb625a/go.mod h1:6+9Xt5Sq1rWx+glMgxhcg2c0DUaehK+5TDcPZ76GypY=
github.com/influxdata/line-protocol/v2 v2.1.0/go.mod h1:QKw43hdUBg3GTk2iC3iyCxksNj7PX9aUSeYOYE/ceHY=
github.com/influxdata/line-protocol/v2 v2.2.0 h1:UPmAqE15Hw5zu9E10SYhoXVLWnEJkWnuCbaCiRsA3c0=
github.com/influxdata/line-protocol/v2 v2.2.0/go.mod h1:DmB3Cnh+3oxmG6LOBIxce4oaL4CPj3OmMPgvauXh+tM=
github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk=
github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
@ -34,6 +49,7 @@ github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as=
@ -61,3 +77,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -3,16 +3,13 @@ package main
import (
@ -56,116 +53,17 @@ type Metric struct {
Value Float
// measurement: node or cpu
// tags: host, cluster, cpu (cpu only if measurement is cpu)
// fields: metrics...
// t: timestamp (accuracy: seconds)
type Line struct {
Measurement string
Tags map[string]string
Fields []Metric
Ts time.Time
// Parse a single line as string.
// There is performance to be gained by implementing a parser
// that directly reads from a bufio.Scanner.
func ParseLine(rawline string) (*Line, error) {
line := &Line{}
parts := strings.Fields(rawline)
if len(parts) != 3 {
return nil, errors.New("line format error")
tagsAndMeasurement := strings.Split(parts[0], ",")
line.Measurement = tagsAndMeasurement[0]
line.Tags = map[string]string{}
for i := 1; i < len(tagsAndMeasurement); i++ {
pair := strings.Split(tagsAndMeasurement[i], "=")
if len(pair) != 2 {
return nil, errors.New("line format error")
line.Tags[pair[0]] = pair[1]
rawfields := strings.Split(parts[1], ",")
line.Fields = []Metric{}
for i := 0; i < len(rawfields); i++ {
pair := strings.Split(rawfields[i], "=")
if len(pair) != 2 {
return nil, errors.New("line format error")
field, err := strconv.ParseFloat(pair[1], 64)
if err != nil {
return nil, err
line.Fields = append(line.Fields, Metric{
Name: pair[0],
Value: Float(field),
unixTimestamp, err := strconv.ParseInt(parts[2], 10, 64)
if err != nil {
return nil, err
line.Ts = time.Unix(unixTimestamp, 0)
return line, nil
func ParseLines(raw string) ([]*Line, error) {
lines := make([]*Line, 0, 1)
for _, line := range strings.Split(raw, "\n") {
if len(line) == 0 {
line, err := ParseLine(line)
if err != nil {
return nil, err
lines = append(lines, line)
return lines, nil
// Listen for connections sending metric data in the line protocol format.
// This is a blocking function, send `true` through the channel argument to shut down the server.
// `handleLine` will be called from different go routines for different connections.
func ReceiveTCP(address string, handleLine func(line *Line), done chan bool) error {
func ReceiveTCP(address string, handleLine func(dec *lineprotocol.Decoder), done chan bool) error {
ln, err := net.Listen("tcp", address)
if err != nil {
return err
handleConnection := func(conn net.Conn, handleLine func(line *Line)) {
reader := bufio.NewReader(conn)
for {
rawline, err := reader.ReadString('\n')
if err == io.EOF {
if err != nil {
log.Printf("reading from connection failed: %s\n", err.Error())
line, err := ParseLine(rawline)
if err != nil {
log.Printf("parsing line failed: %s\n", err.Error())
go func() {
for {
stop := <-done
@ -185,57 +83,37 @@ func ReceiveTCP(address string, handleLine func(line *Line), done chan bool) err
return err
go handleConnection(conn, handleLine)
go func() {
reader := bufio.NewReader(conn)
dec := lineprotocol.NewDecoder(reader)
// Connect to a nats server and subscribe to "updates". This is a blocking
// function. handleLine will be called for each line recieved via nats.
// Send `true` through the done channel for gracefull termination.
func ReceiveNats(address string, handleLine func(line *Line), workers int, ctx context.Context) error {
func ReceiveNats(address string, handleLine func(dec *lineprotocol.Decoder), workers int, ctx context.Context) error {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
return err
defer nc.Close()
var wg sync.WaitGroup
var sub *nats.Subscription
if workers < 2 {
sub, err = nc.Subscribe("updates", func(m *nats.Msg) {
lines, err := ParseLines(string(m.Data))
if err != nil {
msgs := make(chan *nats.Msg, workers*2)
for _, line := range lines {
if err != nil {
return err
log.Printf("NATS subscription to 'updates' on '%s' established\n", address)
err = sub.Unsubscribe()
} else {
msgs := make(chan *nats.Msg, 16)
var wg sync.WaitGroup
if workers > 1 {
for i := 0; i < workers; i++ {
go func() {
for m := range msgs {
lines, err := ParseLines(string(m.Data))
if err != nil {
for _, line := range lines {
dec := lineprotocol.NewDecoderWithBytes(m.Data)
@ -245,22 +123,28 @@ func ReceiveNats(address string, handleLine func(line *Line), workers int, ctx c
sub, err = nc.Subscribe("updates", func(m *nats.Msg) {
msgs <- m
if err != nil {
return err
log.Printf("NATS subscription to 'updates' on '%s' established\n", address)
err = sub.Unsubscribe()
} else {
sub, err = nc.Subscribe("updates", func(m *nats.Msg) {
dec := lineprotocol.NewDecoderWithBytes(m.Data)
if err != nil {
return err
log.Printf("NATS subscription to 'updates' on '%s' established\n", address)
err = sub.Unsubscribe()
if err != nil {
return err
log.Println("NATS connection closed")
return nil

View File

@ -1,89 +0,0 @@
package main
import (
var raw = "node,host=lousxps,cluster=test mem_used=4692.252,proc_total=1083,load_five=0.91,cpu_user=1.424336e+06,cpu_guest_nice=0,cpu_guest=0,mem_available=9829.848,mem_slab=514.796,mem_free=4537.956,proc_run=2,cpu_idle=2.1589764e+07,swap_total=0,mem_cached=6368.5,swap_free=0,load_fifteen=0.93,cpu_nice=196,cpu_softirq=41456,mem_buffers=489.992,mem_total=16088.7,load_one=0.84,cpu_system=517223,cpu_iowait=8994,cpu_steal=0,cpu_irq=113265,mem_sreclaimable=362.452 1629356936\n"
var expectedMeasurement = `node`
var expectedTags = map[string]string{
"host": "lousxps",
"cluster": "test",
var expectedFields = []Metric{
{"mem_used", 4692.252},
{"proc_total", 1083},
{"load_five", 0.91},
{"cpu_user", 1.424336e+06},
{"cpu_guest_nice", 0},
{"cpu_guest", 0},
{"mem_available", 9829.848},
{"mem_slab", 514.796},
{"mem_free", 4537.956},
{"proc_run", 2},
{"cpu_idle", 2.1589764e+07},
{"swap_total", 0},
{"mem_cached", 6368.5},
{"swap_free", 0},
{"load_fifteen", 0.93},
{"cpu_nice", 196},
{"cpu_softirq", 41456},
{"mem_buffers", 489.992},
{"mem_total", 16088.7},
{"load_one", 0.84},
{"cpu_system", 517223},
{"cpu_iowait", 8994},
{"cpu_steal", 0},
{"cpu_irq", 113265},
{"mem_sreclaimable", 362.452},
var expectedTimestamp int64 = 1629356936
func TestParseLine(t *testing.T) {
line, err := ParseLine(raw)
if err != nil {
if line.Measurement != expectedMeasurement {
t.Error("measurement not as expected")
if line.Ts.Unix() != int64(expectedTimestamp) {
t.Error("timestamp not as expected")
if !reflect.DeepEqual(line.Tags, expectedTags) {
t.Error("tags not as expected")
if !reflect.DeepEqual(line.Fields, expectedFields) {
t.Error("fields not as expected")
func BenchmarkParseLine(b *testing.B) {
lines := strings.Repeat(raw, b.N)
scanner := bufio.NewScanner(strings.NewReader(lines))
for i := 0; i < b.N; i++ {
ok := scanner.Scan()
if !ok {
line := scanner.Text()
_, err := ParseLine(line)
if err != nil {