diff --git a/fileStore.go b/fileStore.go deleted file mode 100644 index cb0aaf6..0000000 --- a/fileStore.go +++ /dev/null @@ -1,63 +0,0 @@ -package main - -/* - -//MetricFile holds the state for a metric store file. -//It does not export any variable. -type FileStore struct { - metrics map[string]int - numMetrics int - size int64 - root string -} - -func getFileName(tp string, ts string, start int64) string { - -} - -func newFileStore(root string, size int64, o []string) { - var f FileStore - f.root = root - f.size = size - - for i, name := range o { - f.metrics[name] = i * f.size * 8 - } -} - -func openFile(fp string, hd *FileHeader) (f *File, err error) { - f, err = os.OpenFile(file, os.O_WRONLY, 0644) - - if err != nil { - return f, err - } - -} - -func createFile(fp string) (f *File, err error) { - f, err = os.Create(fp) - - if err != nil { - return f, err - } - -} - -func getFileHandle(file string, start int64) (f *File, err error) { - f, err = os.OpenFile(file, os.O_CREATE|os.O_WRONLY, 0644) - - if err != nil { - return f, err - } - - if _, err := f.Write([]byte("appended some data\n")); err != nil { - f.Close() // ignore error; Write error takes precedence - log.Fatal(err) - } - if err := f.Close(); err != nil { - log.Fatal(err) - } - - return f -} -*/ diff --git a/fileStore_test.go b/fileStore_test.go deleted file mode 100644 index 9e62fe3..0000000 --- a/fileStore_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package main - -import "testing" - -func TestAddMetrics(t *testing.T) { - -} diff --git a/lineprotocol.go b/lineprotocol.go new file mode 100644 index 0000000..7970550 --- /dev/null +++ b/lineprotocol.go @@ -0,0 +1,203 @@ +package main + +import ( + "bufio" + "errors" + "io" + "log" + "math" + "net" + "strconv" + "strings" + "time" + + "github.com/nats-io/nats.go" +) + +// Go's JSON encoder for floats does not support NaN (https://github.com/golang/go/issues/3480). +// This program uses NaN as a signal for missing data. +// For the HTTP JSON API to be able to handle NaN values, +// we have to use our own type which implements encoding/json.Marshaler itself. +type Float float64 + +var NaN Float = Float(math.NaN()) + +func (f Float) IsNaN() bool { + return math.IsNaN(float64(f)) +} + +func (f Float) MarshalJSON() ([]byte, error) { + if math.IsNaN(float64(f)) { + return []byte("null"), nil + } + + return []byte(strconv.FormatFloat(float64(f), 'f', -1, 64)), nil +} + +func (f *Float) UnmarshalJSON(input []byte) error { + s := string(input) + if s == "null" { + *f = NaN + return nil + } + + val, err := strconv.ParseFloat(s, 64) + if err != nil { + return err + } + *f = Float(val) + return nil +} + +type Metric struct { + Name string + Value Float +} + +// measurement: node or cpu +// tags: host, cluster, cpu (cpu only if measurement is cpu) +// fields: metrics... +// t: timestamp (accuracy: seconds) +type Line struct { + Measurement string + Tags map[string]string + Fields []Metric + Ts time.Time +} + +// Parse a single line as string. +// +// There is performance to be gained by implementing a parser +// that directly reads from a bufio.Scanner. +func Parse(rawline string) (*Line, error) { + line := &Line{} + parts := strings.Fields(rawline) + if len(parts) != 3 { + return nil, errors.New("line format error") + } + + tagsAndMeasurement := strings.Split(parts[0], ",") + line.Measurement = tagsAndMeasurement[0] + line.Tags = map[string]string{} + for i := 1; i < len(tagsAndMeasurement); i++ { + pair := strings.Split(tagsAndMeasurement[i], "=") + if len(pair) != 2 { + return nil, errors.New("line format error") + } + line.Tags[pair[0]] = pair[1] + } + + rawfields := strings.Split(parts[1], ",") + line.Fields = []Metric{} + for i := 0; i < len(rawfields); i++ { + pair := strings.Split(rawfields[i], "=") + if len(pair) != 2 { + return nil, errors.New("line format error") + } + field, err := strconv.ParseFloat(pair[1], 64) + if err != nil { + return nil, err + } + + line.Fields = append(line.Fields, Metric{ + Name: pair[0], + Value: Float(field), + }) + } + + unixTimestamp, err := strconv.ParseInt(parts[2], 10, 64) + if err != nil { + return nil, err + } + + line.Ts = time.Unix(unixTimestamp, 0) + return line, nil +} + +// Listen for connections sending metric data in the line protocol format. +// +// This is a blocking function, send `true` through the channel argument to shut down the server. +// `handleLine` will be called from different go routines for different connections. +// +func ReceiveTCP(address string, handleLine func(line *Line), done chan bool) error { + ln, err := net.Listen("tcp", address) + if err != nil { + return err + } + + handleConnection := func(conn net.Conn, handleLine func(line *Line)) { + reader := bufio.NewReader(conn) + for { + rawline, err := reader.ReadString('\n') + if err == io.EOF { + return + } + + if err != nil { + log.Printf("reading from connection failed: %s\n", err.Error()) + return + } + + line, err := Parse(rawline) + if err != nil { + log.Printf("parsing line failed: %s\n", err.Error()) + return + } + + handleLine(line) + } + } + + go func() { + for { + stop := <-done + if stop { + err := ln.Close() + if err != nil { + log.Printf("closing listener failed: %s\n", err.Error()) + } + return + } + } + }() + + for { + conn, err := ln.Accept() + if err != nil { + return err + } + + go handleConnection(conn, handleLine) + } +} + +// Connect to a nats server and subscribe to "updates". This is a blocking +// function. handleLine will be called for each line recieved via nats. +// Send `true` through the done channel for gracefull termination. +func ReceiveNats(address string, handleLine func(line *Line), done chan bool) error { + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + return err + } + defer nc.Close() + + // Subscribe + if _, err := nc.Subscribe("updates", func(m *nats.Msg) { + line, err := Parse(string(m.Data)) + if err != nil { + log.Printf("parsing line failed: %s\n", err.Error()) + return + } + + handleLine(line) + }); err != nil { + return err + } + + log.Printf("NATS subscription to 'updates' on '%s' established\n", address) + for { + _ = <-done + log.Println("NATS connection closed") + return nil + } +} diff --git a/lineprotocol/lineprotocol.go b/lineprotocol/lineprotocol.go deleted file mode 100644 index 4505b1d..0000000 --- a/lineprotocol/lineprotocol.go +++ /dev/null @@ -1,88 +0,0 @@ -package lineprotocol - -import ( - "errors" - "math" - "strconv" - "strings" - "time" -) - -// Go's JSON encoder for floats does not support NaN (https://github.com/golang/go/issues/3480). -// This program uses NaN as a signal for missing data. -// For the HTTP JSON API to be able to handle NaN values, -// we have to use our own type which implements encoding/json.Marshaler itself. -type Float float64 - -func (f Float) MarshalJSON() ([]byte, error) { - if math.IsNaN(float64(f)) { - return []byte("null"), nil - } - - return []byte(strconv.FormatFloat(float64(f), 'f', -1, 64)), nil -} - -type Metric struct { - Name string - Value Float -} - -// measurement: node or cpu -// tags: host, cluster, cpu (cpu only if measurement is cpu) -// fields: metrics... -// t: timestamp (accuracy: seconds) -type Line struct { - Measurement string - Tags map[string]string - Fields []Metric - Ts time.Time -} - -// Parse a single line as string. -// -// There is performance to be gained by implementing a parser -// that directly reads from a bufio.Scanner. -func Parse(rawline string) (*Line, error) { - line := &Line{} - parts := strings.Fields(rawline) - if len(parts) != 3 { - return nil, errors.New("line format error") - } - - tagsAndMeasurement := strings.Split(parts[0], ",") - line.Measurement = tagsAndMeasurement[0] - line.Tags = map[string]string{} - for i := 1; i < len(tagsAndMeasurement); i++ { - pair := strings.Split(tagsAndMeasurement[i], "=") - if len(pair) != 2 { - return nil, errors.New("line format error") - } - line.Tags[pair[0]] = pair[1] - } - - rawfields := strings.Split(parts[1], ",") - line.Fields = []Metric{} - for i := 0; i < len(rawfields); i++ { - pair := strings.Split(rawfields[i], "=") - if len(pair) != 2 { - return nil, errors.New("line format error") - } - field, err := strconv.ParseFloat(pair[1], 64) - if err != nil { - return nil, err - } - - line.Fields = append(line.Fields, Metric{ - Name: pair[0], - Value: Float(field), - }) - } - - unixTimestamp, err := strconv.ParseInt(parts[2], 10, 64) - if err != nil { - return nil, err - } - - line.Ts = time.Unix(unixTimestamp, 0) - return line, nil -} diff --git a/lineprotocol/lineprotocol_test.go b/lineprotocol/lineprotocol_test.go deleted file mode 100644 index 8cc95cb..0000000 --- a/lineprotocol/lineprotocol_test.go +++ /dev/null @@ -1,64 +0,0 @@ -package lineprotocol - -import ( - "reflect" - "testing" -) - -func TestParse(t *testing.T) { - raw := `node,host=lousxps,cluster=test mem_used=4692.252,proc_total=1083,load_five=0.91,cpu_user=1.424336e+06,cpu_guest_nice=0,cpu_guest=0,mem_available=9829.848,mem_slab=514.796,mem_free=4537.956,proc_run=2,cpu_idle=2.1589764e+07,swap_total=0,mem_cached=6368.5,swap_free=0,load_fifteen=0.93,cpu_nice=196,cpu_softirq=41456,mem_buffers=489.992,mem_total=16088.7,load_one=0.84,cpu_system=517223,cpu_iowait=8994,cpu_steal=0,cpu_irq=113265,mem_sreclaimable=362.452 1629356936` - expectedMeasurement := `node` - expectedTags := map[string]string{ - "host": "lousxps", - "cluster": "test", - } - expectedFields := []Metric{ - {"mem_used", 4692.252}, - {"proc_total", 1083}, - {"load_five", 0.91}, - {"cpu_user", 1.424336e+06}, - {"cpu_guest_nice", 0}, - {"cpu_guest", 0}, - {"mem_available", 9829.848}, - {"mem_slab", 514.796}, - {"mem_free", 4537.956}, - {"proc_run", 2}, - {"cpu_idle", 2.1589764e+07}, - {"swap_total", 0}, - {"mem_cached", 6368.5}, - {"swap_free", 0}, - {"load_fifteen", 0.93}, - {"cpu_nice", 196}, - {"cpu_softirq", 41456}, - {"mem_buffers", 489.992}, - {"mem_total", 16088.7}, - {"load_one", 0.84}, - {"cpu_system", 517223}, - {"cpu_iowait", 8994}, - {"cpu_steal", 0}, - {"cpu_irq", 113265}, - {"mem_sreclaimable", 362.452}, - } - expectedTimestamp := 1629356936 - - line, err := Parse(raw) - if err != nil { - t.Error(err) - } - - if line.Measurement != expectedMeasurement { - t.Error("measurement not as expected") - } - - if line.Ts.Unix() != int64(expectedTimestamp) { - t.Error("timestamp not as expected") - } - - if !reflect.DeepEqual(line.Tags, expectedTags) { - t.Error("tags not as expected") - } - - if !reflect.DeepEqual(line.Fields, expectedFields) { - t.Error("fields not as expected") - } -} diff --git a/lineprotocol/receiver.go b/lineprotocol/receiver.go deleted file mode 100644 index 7209936..0000000 --- a/lineprotocol/receiver.go +++ /dev/null @@ -1,98 +0,0 @@ -package lineprotocol - -import ( - "bufio" - "io" - "log" - "net" - - nats "github.com/nats-io/nats.go" -) - -// Listen for connections sending metric data in the line protocol format. -// -// This is a blocking function, send `true` through the channel argument to shut down the server. -// `handleLine` will be called from different go routines for different connections. -// -func ReceiveTCP(address string, handleLine func(line *Line), done chan bool) error { - ln, err := net.Listen("tcp", address) - if err != nil { - return err - } - - handleConnection := func(conn net.Conn, handleLine func(line *Line)) { - reader := bufio.NewReader(conn) - for { - rawline, err := reader.ReadString('\n') - if err == io.EOF { - return - } - - if err != nil { - log.Printf("reading from connection failed: %s\n", err.Error()) - return - } - - line, err := Parse(rawline) - if err != nil { - log.Printf("parsing line failed: %s\n", err.Error()) - return - } - - handleLine(line) - } - } - - go func() { - for { - stop := <-done - if stop { - err := ln.Close() - if err != nil { - log.Printf("closing listener failed: %s\n", err.Error()) - } - return - } - } - }() - - for { - conn, err := ln.Accept() - if err != nil { - return err - } - - go handleConnection(conn, handleLine) - } -} - -// Connect to a nats server and subscribe to "updates". This is a blocking -// function. handleLine will be called for each line recieved via nats. -// Send `true` through the done channel for gracefull termination. -func ReceiveNats(address string, handleLine func(line *Line), done chan bool) error { - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - return err - } - defer nc.Close() - - // Subscribe - if _, err := nc.Subscribe("updates", func(m *nats.Msg) { - line, err := Parse(string(m.Data)) - if err != nil { - log.Printf("parsing line failed: %s\n", err.Error()) - return - } - - handleLine(line) - }); err != nil { - return err - } - - log.Printf("NATS subscription to 'updates' on '%s' established\n", address) - for { - _ = <-done - log.Println("NATS connection closed") - return nil - } -} diff --git a/lineprotocol_test.go b/lineprotocol_test.go new file mode 100644 index 0000000..518cb8f --- /dev/null +++ b/lineprotocol_test.go @@ -0,0 +1,89 @@ +package main + +import ( + "bufio" + "reflect" + "strings" + "testing" +) + +var raw = "node,host=lousxps,cluster=test mem_used=4692.252,proc_total=1083,load_five=0.91,cpu_user=1.424336e+06,cpu_guest_nice=0,cpu_guest=0,mem_available=9829.848,mem_slab=514.796,mem_free=4537.956,proc_run=2,cpu_idle=2.1589764e+07,swap_total=0,mem_cached=6368.5,swap_free=0,load_fifteen=0.93,cpu_nice=196,cpu_softirq=41456,mem_buffers=489.992,mem_total=16088.7,load_one=0.84,cpu_system=517223,cpu_iowait=8994,cpu_steal=0,cpu_irq=113265,mem_sreclaimable=362.452 1629356936\n" +var expectedMeasurement = `node` +var expectedTags = map[string]string{ + "host": "lousxps", + "cluster": "test", +} +var expectedFields = []Metric{ + {"mem_used", 4692.252}, + {"proc_total", 1083}, + {"load_five", 0.91}, + {"cpu_user", 1.424336e+06}, + {"cpu_guest_nice", 0}, + {"cpu_guest", 0}, + {"mem_available", 9829.848}, + {"mem_slab", 514.796}, + {"mem_free", 4537.956}, + {"proc_run", 2}, + {"cpu_idle", 2.1589764e+07}, + {"swap_total", 0}, + {"mem_cached", 6368.5}, + {"swap_free", 0}, + {"load_fifteen", 0.93}, + {"cpu_nice", 196}, + {"cpu_softirq", 41456}, + {"mem_buffers", 489.992}, + {"mem_total", 16088.7}, + {"load_one", 0.84}, + {"cpu_system", 517223}, + {"cpu_iowait", 8994}, + {"cpu_steal", 0}, + {"cpu_irq", 113265}, + {"mem_sreclaimable", 362.452}, +} +var expectedTimestamp int64 = 1629356936 + +func TestParseLine(t *testing.T) { + line, err := Parse(raw) + if err != nil { + t.Error(err) + } + + if line.Measurement != expectedMeasurement { + t.Error("measurement not as expected") + } + + if line.Ts.Unix() != int64(expectedTimestamp) { + t.Error("timestamp not as expected") + } + + if !reflect.DeepEqual(line.Tags, expectedTags) { + t.Error("tags not as expected") + } + + if !reflect.DeepEqual(line.Fields, expectedFields) { + t.Error("fields not as expected") + } +} + +func BenchmarkParseLine(b *testing.B) { + b.StopTimer() + lines := strings.Repeat(raw, b.N) + scanner := bufio.NewScanner(strings.NewReader(lines)) + scanner.Split(bufio.ScanLines) + + b.StartTimer() + for i := 0; i < b.N; i++ { + ok := scanner.Scan() + if !ok { + b.Error("woops") + return + } + + line := scanner.Text() + _, err := Parse(line) + if err != nil { + b.Error(err) + return + } + } +}