From 85591e7a03236c7f35561077439747442f6de8f9 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Thu, 7 Oct 2021 14:52:16 +0200 Subject: [PATCH 1/6] Switch to influxes line protocol parser --- go.mod | 3 +- go.sum | 19 +++++ lineprotocol.go | 174 ++++++++----------------------------------- lineprotocol_test.go | 89 ---------------------- 4 files changed, 50 insertions(+), 235 deletions(-) delete mode 100644 lineprotocol_test.go diff --git a/go.mod b/go.mod index 0c62c1d..791d58c 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,10 @@ module github.com/ClusterCockpit/cc-metric-store go 1.16 require ( - github.com/golang-jwt/jwt/v4 v4.0.0 // indirect + github.com/golang-jwt/jwt/v4 v4.0.0 github.com/golang/protobuf v1.5.2 // indirect github.com/gorilla/mux v1.8.0 + github.com/influxdata/line-protocol/v2 v2.2.0 github.com/nats-io/nats-server/v2 v2.2.6 // indirect github.com/nats-io/nats.go v1.11.0 ) diff --git a/go.sum b/go.sum index bcfc0ce..4b21e43 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= +github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= +github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU= github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang-jwt/jwt/v4 v4.0.0 h1:RAqyYixv1p7uEnocuy8P1nru5wprCh/MH2BIlW5z5/o= @@ -14,11 +18,22 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/influxdata/line-protocol-corpus v0.0.0-20210519164801-ca6fa5da0184/go.mod h1:03nmhxzZ7Xk2pdG+lmMd7mHDfeVOYFyhOgwO61qWU98= +github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937/go.mod h1:BKR9c0uHSmRgM/se9JhFHtTT7JTO67X23MtKMHtZcpo= +github.com/influxdata/line-protocol/v2 v2.0.0-20210312151457-c52fdecb625a/go.mod h1:6+9Xt5Sq1rWx+glMgxhcg2c0DUaehK+5TDcPZ76GypY= +github.com/influxdata/line-protocol/v2 v2.1.0/go.mod h1:QKw43hdUBg3GTk2iC3iyCxksNj7PX9aUSeYOYE/ceHY= +github.com/influxdata/line-protocol/v2 v2.2.0 h1:UPmAqE15Hw5zu9E10SYhoXVLWnEJkWnuCbaCiRsA3c0= +github.com/influxdata/line-protocol/v2 v2.2.0/go.mod h1:DmB3Cnh+3oxmG6LOBIxce4oaL4CPj3OmMPgvauXh+tM= github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk= github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= @@ -34,6 +49,7 @@ github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as= @@ -61,3 +77,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/lineprotocol.go b/lineprotocol.go index 05140d2..5fcdbee 100644 --- a/lineprotocol.go +++ b/lineprotocol.go @@ -3,16 +3,13 @@ package main import ( "bufio" "context" - "errors" - "io" "log" "math" "net" "strconv" - "strings" "sync" - "time" + "github.com/influxdata/line-protocol/v2/lineprotocol" "github.com/nats-io/nats.go" ) @@ -56,116 +53,17 @@ type Metric struct { Value Float } -// measurement: node or cpu -// tags: host, cluster, cpu (cpu only if measurement is cpu) -// fields: metrics... -// t: timestamp (accuracy: seconds) -type Line struct { - Measurement string - Tags map[string]string - Fields []Metric - Ts time.Time -} - -// Parse a single line as string. -// -// There is performance to be gained by implementing a parser -// that directly reads from a bufio.Scanner. -func ParseLine(rawline string) (*Line, error) { - line := &Line{} - parts := strings.Fields(rawline) - if len(parts) != 3 { - return nil, errors.New("line format error") - } - - tagsAndMeasurement := strings.Split(parts[0], ",") - line.Measurement = tagsAndMeasurement[0] - line.Tags = map[string]string{} - for i := 1; i < len(tagsAndMeasurement); i++ { - pair := strings.Split(tagsAndMeasurement[i], "=") - if len(pair) != 2 { - return nil, errors.New("line format error") - } - line.Tags[pair[0]] = pair[1] - } - - rawfields := strings.Split(parts[1], ",") - line.Fields = []Metric{} - for i := 0; i < len(rawfields); i++ { - pair := strings.Split(rawfields[i], "=") - if len(pair) != 2 { - return nil, errors.New("line format error") - } - field, err := strconv.ParseFloat(pair[1], 64) - if err != nil { - return nil, err - } - - line.Fields = append(line.Fields, Metric{ - Name: pair[0], - Value: Float(field), - }) - } - - unixTimestamp, err := strconv.ParseInt(parts[2], 10, 64) - if err != nil { - return nil, err - } - - line.Ts = time.Unix(unixTimestamp, 0) - return line, nil -} - -func ParseLines(raw string) ([]*Line, error) { - lines := make([]*Line, 0, 1) - for _, line := range strings.Split(raw, "\n") { - if len(line) == 0 { - continue - } - - line, err := ParseLine(line) - if err != nil { - return nil, err - } - lines = append(lines, line) - } - return lines, nil -} - // Listen for connections sending metric data in the line protocol format. // // This is a blocking function, send `true` through the channel argument to shut down the server. // `handleLine` will be called from different go routines for different connections. // -func ReceiveTCP(address string, handleLine func(line *Line), done chan bool) error { +func ReceiveTCP(address string, handleLine func(dec *lineprotocol.Decoder), done chan bool) error { ln, err := net.Listen("tcp", address) if err != nil { return err } - handleConnection := func(conn net.Conn, handleLine func(line *Line)) { - reader := bufio.NewReader(conn) - for { - rawline, err := reader.ReadString('\n') - if err == io.EOF { - return - } - - if err != nil { - log.Printf("reading from connection failed: %s\n", err.Error()) - return - } - - line, err := ParseLine(rawline) - if err != nil { - log.Printf("parsing line failed: %s\n", err.Error()) - return - } - - handleLine(line) - } - } - go func() { for { stop := <-done @@ -185,57 +83,37 @@ func ReceiveTCP(address string, handleLine func(line *Line), done chan bool) err return err } - go handleConnection(conn, handleLine) + go func() { + reader := bufio.NewReader(conn) + dec := lineprotocol.NewDecoder(reader) + handleLine(dec) + }() } } // Connect to a nats server and subscribe to "updates". This is a blocking // function. handleLine will be called for each line recieved via nats. // Send `true` through the done channel for gracefull termination. -func ReceiveNats(address string, handleLine func(line *Line), workers int, ctx context.Context) error { +func ReceiveNats(address string, handleLine func(dec *lineprotocol.Decoder), workers int, ctx context.Context) error { nc, err := nats.Connect(nats.DefaultURL) if err != nil { return err } defer nc.Close() + var wg sync.WaitGroup var sub *nats.Subscription - if workers < 2 { - sub, err = nc.Subscribe("updates", func(m *nats.Msg) { - lines, err := ParseLines(string(m.Data)) - if err != nil { - log.Println(err.Error()) - } + msgs := make(chan *nats.Msg, workers*2) - for _, line := range lines { - handleLine(line) - } - }) - if err != nil { - return err - } - - log.Printf("NATS subscription to 'updates' on '%s' established\n", address) - - <-ctx.Done() - err = sub.Unsubscribe() - } else { - msgs := make(chan *nats.Msg, 16) - var wg sync.WaitGroup + if workers > 1 { wg.Add(workers) for i := 0; i < workers; i++ { go func() { for m := range msgs { - lines, err := ParseLines(string(m.Data)) - if err != nil { - log.Println(err.Error()) - } - - for _, line := range lines { - handleLine(line) - } + dec := lineprotocol.NewDecoderWithBytes(m.Data) + handleLine(dec) } wg.Done() @@ -245,22 +123,28 @@ func ReceiveNats(address string, handleLine func(line *Line), workers int, ctx c sub, err = nc.Subscribe("updates", func(m *nats.Msg) { msgs <- m }) - if err != nil { - return err - } - - log.Printf("NATS subscription to 'updates' on '%s' established\n", address) - - <-ctx.Done() - err = sub.Unsubscribe() - close(msgs) - wg.Wait() + } else { + sub, err = nc.Subscribe("updates", func(m *nats.Msg) { + dec := lineprotocol.NewDecoderWithBytes(m.Data) + handleLine(dec) + }) } if err != nil { return err } + log.Printf("NATS subscription to 'updates' on '%s' established\n", address) + + <-ctx.Done() + err = sub.Unsubscribe() + close(msgs) + wg.Wait() + + if err != nil { + return err + } + nc.Close() log.Println("NATS connection closed") return nil diff --git a/lineprotocol_test.go b/lineprotocol_test.go deleted file mode 100644 index d601020..0000000 --- a/lineprotocol_test.go +++ /dev/null @@ -1,89 +0,0 @@ -package main - -import ( - "bufio" - "reflect" - "strings" - "testing" -) - -var raw = "node,host=lousxps,cluster=test mem_used=4692.252,proc_total=1083,load_five=0.91,cpu_user=1.424336e+06,cpu_guest_nice=0,cpu_guest=0,mem_available=9829.848,mem_slab=514.796,mem_free=4537.956,proc_run=2,cpu_idle=2.1589764e+07,swap_total=0,mem_cached=6368.5,swap_free=0,load_fifteen=0.93,cpu_nice=196,cpu_softirq=41456,mem_buffers=489.992,mem_total=16088.7,load_one=0.84,cpu_system=517223,cpu_iowait=8994,cpu_steal=0,cpu_irq=113265,mem_sreclaimable=362.452 1629356936\n" -var expectedMeasurement = `node` -var expectedTags = map[string]string{ - "host": "lousxps", - "cluster": "test", -} -var expectedFields = []Metric{ - {"mem_used", 4692.252}, - {"proc_total", 1083}, - {"load_five", 0.91}, - {"cpu_user", 1.424336e+06}, - {"cpu_guest_nice", 0}, - {"cpu_guest", 0}, - {"mem_available", 9829.848}, - {"mem_slab", 514.796}, - {"mem_free", 4537.956}, - {"proc_run", 2}, - {"cpu_idle", 2.1589764e+07}, - {"swap_total", 0}, - {"mem_cached", 6368.5}, - {"swap_free", 0}, - {"load_fifteen", 0.93}, - {"cpu_nice", 196}, - {"cpu_softirq", 41456}, - {"mem_buffers", 489.992}, - {"mem_total", 16088.7}, - {"load_one", 0.84}, - {"cpu_system", 517223}, - {"cpu_iowait", 8994}, - {"cpu_steal", 0}, - {"cpu_irq", 113265}, - {"mem_sreclaimable", 362.452}, -} -var expectedTimestamp int64 = 1629356936 - -func TestParseLine(t *testing.T) { - line, err := ParseLine(raw) - if err != nil { - t.Error(err) - } - - if line.Measurement != expectedMeasurement { - t.Error("measurement not as expected") - } - - if line.Ts.Unix() != int64(expectedTimestamp) { - t.Error("timestamp not as expected") - } - - if !reflect.DeepEqual(line.Tags, expectedTags) { - t.Error("tags not as expected") - } - - if !reflect.DeepEqual(line.Fields, expectedFields) { - t.Error("fields not as expected") - } -} - -func BenchmarkParseLine(b *testing.B) { - b.StopTimer() - lines := strings.Repeat(raw, b.N) - scanner := bufio.NewScanner(strings.NewReader(lines)) - scanner.Split(bufio.ScanLines) - - b.StartTimer() - for i := 0; i < b.N; i++ { - ok := scanner.Scan() - if !ok { - b.Error("woops") - return - } - - line := scanner.Text() - _, err := ParseLine(line) - if err != nil { - b.Error(err) - return - } - } -} From 2fc6ad284f4954ecee9c8e181ca0f08399c1ee72 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Thu, 7 Oct 2021 14:59:07 +0200 Subject: [PATCH 2/6] Handle new line-protrocol format in handleLine --- archive.go | 4 +++ memstore.go | 3 +- metric-store.go | 91 ++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 77 insertions(+), 21 deletions(-) diff --git a/archive.go b/archive.go index b6f0087..5e32d49 100644 --- a/archive.go +++ b/archive.go @@ -192,6 +192,10 @@ func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error { func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, error) { direntries, err := os.ReadDir(dir) if err != nil { + if os.IsNotExist(err) { + return 0, nil + } + return 0, err } diff --git a/memstore.go b/memstore.go index 1bc1ce5..d9b81f2 100644 --- a/memstore.go +++ b/memstore.go @@ -283,7 +283,8 @@ func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error for _, metric := range metrics { minfo, ok := m.metrics[metric.Name] if !ok { - return errors.New("Unknown metric: " + metric.Name) + // return errors.New("Unknown metric: " + metric.Name) + continue } b := l.metrics[minfo.offset] diff --git a/metric-store.go b/metric-store.go index 5c7f806..a790494 100644 --- a/metric-store.go +++ b/metric-store.go @@ -10,6 +10,8 @@ import ( "sync" "syscall" "time" + + "github.com/influxdata/line-protocol/v2/lineprotocol" ) type MetricConfig struct { @@ -50,29 +52,78 @@ func loadConfiguration(file string) Config { return config } -func handleLine(line *Line) { - cluster, ok := line.Tags["cluster"] - if !ok { - log.Println("'cluster' tag missing") - return - } +func handleLine(dec *lineprotocol.Decoder) { + for dec.Next() { + measurement, err := dec.Measurement() + if err != nil { + log.Fatal(err) + } - host, ok := line.Tags["host"] - if !ok { - log.Println("'host' tag missing") - return - } + var cluster, host, typeName, typeId string + for { + key, val, err := dec.NextTag() + if err != nil { + log.Fatal(err) + } + if key == nil { + break + } - selector := []string{cluster, host} - if id, ok := line.Tags[line.Measurement]; ok { - selector = append(selector, line.Measurement+id) - } + switch string(key) { + case "cluster": + cluster = string(val) + case "host": + host = string(val) + case "type": + typeName = string(val) + case "type-id": + typeId = string(val) + default: + log.Fatalf("Unkown tag: '%s' (value: '%s')", string(key), string(val)) + } + } - ts := line.Ts.Unix() - // log.Printf("ts=%d, tags=%v\n", ts, selector) - err := memoryStore.Write(selector, ts, line.Fields) - if err != nil { - log.Printf("error: %s\n", err.Error()) + selector := make([]string, 0, 3) + selector = append(selector, cluster) + selector = append(selector, host) + if len(typeId) > 0 { + selector = append(selector, typeName+typeId) + } + + var value Float + for { + key, val, err := dec.NextField() + if err != nil { + log.Fatal(err) + } + + if key == nil { + break + } + + if string(key) != "value" { + log.Fatalf("Unkown field: '%s' (value: %#v)", string(key), val) + } + + if val.Kind() == lineprotocol.Float { + value = Float(val.FloatV()) + } else if val.Kind() == lineprotocol.Int { + value = Float(val.IntV()) + } else { + log.Fatalf("Unsupported value type in message: %s", val.Kind().String()) + } + } + + t, err := dec.Time(lineprotocol.Second, time.Now()) + if err != nil { + log.Fatal(err) + } + + if err := memoryStore.Write(selector, t.Unix(), []Metric{ + {Name: string(measurement), Value: value}, + }); err != nil { + log.Fatal(err) + } } } From 3aae1e80fb5d0ebfddba68a55730cec602eb10a2 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Mon, 11 Oct 2021 10:55:36 +0200 Subject: [PATCH 3/6] host to hostname in lp; update README.md --- .gitignore | 2 +- README.md | 6 +++--- metric-store.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index 4de07ca..5865635 100644 --- a/.gitignore +++ b/.gitignore @@ -16,4 +16,4 @@ # vendor/ # Project specific ignores -/archive +/var diff --git a/README.md b/README.md index b79fd65..ac7e8f5 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,8 @@ Example selectors: ### Config file +All durations are specified in seconds. + - `metrics`: Map of metric-name to objects with the following properties - `frequency`: Timestep/Interval/Resolution of this metric (In seconds) - `aggregation`: Can be `"sum"`, `"avg"` or `null` @@ -92,9 +94,7 @@ Example selectors: - `"avg"` means that values from the child levels are averaged for the parent level - `scope`: Unused at the moment, should be something like `"node"`, `"socket"` or `"cpu"` - `nats`: Url of NATS.io server (The `updates` channel will be subscribed for metrics) -- `archive-root`: Directory to be used as archive -- `restore-last-hours`: After restart, load data from the past *X* hours back to memory -- `checkpoint-interval-hours`: Every *X* hours, write currently held data to disk +- `jwt-public-key`: Base64 encoded string, use this to verify requests to the HTTP API ### Test the complete setup (excluding ClusterCockpit itself) diff --git a/metric-store.go b/metric-store.go index a790494..9909f95 100644 --- a/metric-store.go +++ b/metric-store.go @@ -72,7 +72,7 @@ func handleLine(dec *lineprotocol.Decoder) { switch string(key) { case "cluster": cluster = string(val) - case "host": + case "hostname": host = string(val) case "type": typeName = string(val) From fecc33a2242ad79ff0290be530c40167ae139d40 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Mon, 11 Oct 2021 10:56:38 +0200 Subject: [PATCH 4/6] One less allocation --- memstore.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/memstore.go b/memstore.go index d9b81f2..a6e4a97 100644 --- a/memstore.go +++ b/memstore.go @@ -16,7 +16,9 @@ const ( // So that we can reuse allocations var bufferPool sync.Pool = sync.Pool{ New: func() interface{} { - return make([]Float, 0, BUFFER_CAP) + return &buffer{ + data: make([]Float, 0, BUFFER_CAP), + } }, } @@ -37,13 +39,12 @@ type buffer struct { } func newBuffer(ts, freq int64) *buffer { - return &buffer{ - frequency: freq, - start: ts, - data: bufferPool.Get().([]Float)[:0], - prev: nil, - next: nil, - } + b := bufferPool.Get().(*buffer) + b.frequency = freq + b.start = ts + b.prev = nil + b.next = nil + return b } // If a new buffer was created, the new head is returnd. @@ -137,10 +138,11 @@ func (b *buffer) free(t int64) (int, error) { } n += 1 - bufferPool.Put(b.data) - b.data = nil + b.frequency = 0 + b.start = 0 b.next = nil b.prev = nil + bufferPool.Put(b) b = prev } return n, nil From a15238a7a9a28b9eed1b0bae8a99a1607639935d Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Mon, 11 Oct 2021 11:06:42 +0200 Subject: [PATCH 5/6] Add multiplication by time.Second --- metric-store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metric-store.go b/metric-store.go index 9909f95..9236d4f 100644 --- a/metric-store.go +++ b/metric-store.go @@ -210,7 +210,7 @@ func main() { conf = loadConfiguration("config.json") memoryStore = NewMemoryStore(conf.Metrics) - restoreFrom := startupTime.Add(-time.Duration(conf.Checkpoints.Restore)) + restoreFrom := startupTime.Add(-time.Duration(conf.Checkpoints.Restore) * time.Second) files, err := memoryStore.FromCheckpoint(conf.Checkpoints.RootDir, restoreFrom.Unix()) if err != nil { log.Fatalf("Loading checkpoints failed: %s\n", err.Error()) From 807c613cae8d415ff7aee563bd7b35bdd93b76f2 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Mon, 11 Oct 2021 16:28:05 +0200 Subject: [PATCH 6/6] Add (untested) HTTP write API --- api.go | 19 +++++++++++++++++++ lineprotocol.go | 50 +++++++------------------------------------------ metric-store.go | 19 ++++++++++--------- 3 files changed, 36 insertions(+), 52 deletions(-) diff --git a/api.go b/api.go index ba6cf70..a397d0b 100644 --- a/api.go +++ b/api.go @@ -1,6 +1,7 @@ package main import ( + "bufio" "context" "crypto/ed25519" "encoding/base64" @@ -15,6 +16,7 @@ import ( "github.com/golang-jwt/jwt/v4" "github.com/gorilla/mux" + "github.com/influxdata/line-protocol/v2/lineprotocol" ) // Example: @@ -208,6 +210,22 @@ func handlePeek(rw http.ResponseWriter, r *http.Request) { } } +func handleWrite(rw http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + + reader := bufio.NewReader(r.Body) + dec := lineprotocol.NewDecoder(reader) + // Unlike the name suggests, handleLine can handle multiple lines + if err := handleLine(dec); err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + rw.WriteHeader(http.StatusOK) +} + func authentication(next http.Handler, publicKey ed25519.PublicKey) http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { authheader := r.Header.Get("Authorization") @@ -244,6 +262,7 @@ func StartApiServer(address string, ctx context.Context) error { r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/stats", handleStats) r.HandleFunc("/api/{to:[0-9]+}/free", handleFree) r.HandleFunc("/api/{cluster}/peek", handlePeek) + r.HandleFunc("/api/write", handleWrite) server := &http.Server{ Handler: r, diff --git a/lineprotocol.go b/lineprotocol.go index 5fcdbee..1fbe6c2 100644 --- a/lineprotocol.go +++ b/lineprotocol.go @@ -1,11 +1,9 @@ package main import ( - "bufio" "context" "log" "math" - "net" "strconv" "sync" @@ -53,48 +51,10 @@ type Metric struct { Value Float } -// Listen for connections sending metric data in the line protocol format. -// -// This is a blocking function, send `true` through the channel argument to shut down the server. -// `handleLine` will be called from different go routines for different connections. -// -func ReceiveTCP(address string, handleLine func(dec *lineprotocol.Decoder), done chan bool) error { - ln, err := net.Listen("tcp", address) - if err != nil { - return err - } - - go func() { - for { - stop := <-done - if stop { - err := ln.Close() - if err != nil { - log.Printf("closing listener failed: %s\n", err.Error()) - } - return - } - } - }() - - for { - conn, err := ln.Accept() - if err != nil { - return err - } - - go func() { - reader := bufio.NewReader(conn) - dec := lineprotocol.NewDecoder(reader) - handleLine(dec) - }() - } -} - // Connect to a nats server and subscribe to "updates". This is a blocking // function. handleLine will be called for each line recieved via nats. // Send `true` through the done channel for gracefull termination. -func ReceiveNats(address string, handleLine func(dec *lineprotocol.Decoder), workers int, ctx context.Context) error { +func ReceiveNats(address string, handleLine func(dec *lineprotocol.Decoder) error, workers int, ctx context.Context) error { nc, err := nats.Connect(nats.DefaultURL) if err != nil { return err @@ -113,7 +73,9 @@ func ReceiveNats(address string, handleLine func(dec *lineprotocol.Decoder), wor go func() { for m := range msgs { dec := lineprotocol.NewDecoderWithBytes(m.Data) - handleLine(dec) + if err := handleLine(dec); err != nil { + log.Printf("error: %s\n", err.Error()) + } } wg.Done() @@ -126,7 +88,9 @@ func ReceiveNats(address string, handleLine func(dec *lineprotocol.Decoder), wor } else { sub, err = nc.Subscribe("updates", func(m *nats.Msg) { dec := lineprotocol.NewDecoderWithBytes(m.Data) - handleLine(dec) + if err := handleLine(dec); err != nil { + log.Printf("error: %s\n", err.Error()) + } }) } diff --git a/metric-store.go b/metric-store.go index 9236d4f..d983c05 100644 --- a/metric-store.go +++ b/metric-store.go @@ -52,18 +52,18 @@ func loadConfiguration(file string) Config { return config } -func handleLine(dec *lineprotocol.Decoder) { +func handleLine(dec *lineprotocol.Decoder) error { for dec.Next() { measurement, err := dec.Measurement() if err != nil { - log.Fatal(err) + return err } var cluster, host, typeName, typeId string for { key, val, err := dec.NextTag() if err != nil { - log.Fatal(err) + return err } if key == nil { break @@ -79,7 +79,7 @@ func handleLine(dec *lineprotocol.Decoder) { case "type-id": typeId = string(val) default: - log.Fatalf("Unkown tag: '%s' (value: '%s')", string(key), string(val)) + return fmt.Errorf("unkown tag: '%s' (value: '%s')", string(key), string(val)) } } @@ -94,7 +94,7 @@ func handleLine(dec *lineprotocol.Decoder) { for { key, val, err := dec.NextField() if err != nil { - log.Fatal(err) + return err } if key == nil { @@ -102,7 +102,7 @@ func handleLine(dec *lineprotocol.Decoder) { } if string(key) != "value" { - log.Fatalf("Unkown field: '%s' (value: %#v)", string(key), val) + return fmt.Errorf("unkown field: '%s' (value: %#v)", string(key), val) } if val.Kind() == lineprotocol.Float { @@ -110,21 +110,22 @@ func handleLine(dec *lineprotocol.Decoder) { } else if val.Kind() == lineprotocol.Int { value = Float(val.IntV()) } else { - log.Fatalf("Unsupported value type in message: %s", val.Kind().String()) + return fmt.Errorf("unsupported value type in message: %s", val.Kind().String()) } } t, err := dec.Time(lineprotocol.Second, time.Now()) if err != nil { - log.Fatal(err) + return err } if err := memoryStore.Write(selector, t.Unix(), []Metric{ {Name: string(measurement), Value: value}, }); err != nil { - log.Fatal(err) + return err } } + return nil } func intervals(wg *sync.WaitGroup, ctx context.Context) {