diff --git a/archive.go b/archive.go index be47696..313ef4c 100644 --- a/archive.go +++ b/archive.go @@ -26,6 +26,9 @@ type CheckpointMetrics struct { Data []Float `json:"data"` } +// As `Float` implements a custom MarshalJSON() function, +// serializing an array of such types has more overhead +// than one would assume (because of extra allocations, interfaces and so on). func (cm *CheckpointMetrics) MarshalJSON() ([]byte, error) { buf := make([]byte, 0, 128+len(cm.Data)*8) buf = append(buf, `{"frequency":`...) @@ -224,7 +227,7 @@ func (l *level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { // Different host's data is loaded to memory in parallel. func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { var wg sync.WaitGroup - work := make(chan [2]string, NumWorkers) + work := make(chan [2]string, NumWorkers*2) n, errs := int32(0), int32(0) wg.Add(NumWorkers) diff --git a/lineprotocol.go b/lineprotocol.go index f9d0b1b..ab7e332 100644 --- a/lineprotocol.go +++ b/lineprotocol.go @@ -2,8 +2,10 @@ package main import ( "context" + "errors" "fmt" "log" + "net" "sync" "time" @@ -13,8 +15,70 @@ import ( type Metric struct { Name string - minfo metricInfo Value Float + + mc MetricConfig +} + +// Currently unused, could be used to send messages via raw TCP. +// Each connection is handled in it's own goroutine. This is a blocking function. +func ReceiveRaw(ctx context.Context, listener net.Listener, handleLine func(*lineprotocol.Decoder, string) error) error { + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + <-ctx.Done() + if err := listener.Close(); err != nil { + log.Printf("listener.Close(): %s", err.Error()) + } + }() + + for { + conn, err := listener.Accept() + if err != nil { + if errors.Is(err, net.ErrClosed) { + break + } + + log.Printf("listener.Accept(): %s", err.Error()) + } + + wg.Add(2) + go func() { + defer wg.Done() + defer conn.Close() + + dec := lineprotocol.NewDecoder(conn) + connctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer wg.Done() + select { + case <-connctx.Done(): + conn.Close() + case <-ctx.Done(): + conn.Close() + } + }() + + if err := handleLine(dec, "default"); err != nil { + if errors.Is(err, net.ErrClosed) { + return + } + + log.Printf("%s: %s", conn.RemoteAddr().String(), err.Error()) + errmsg := make([]byte, 128) + errmsg = append(errmsg, `error: `...) + errmsg = append(errmsg, err.Error()...) + errmsg = append(errmsg, '\n') + conn.Write(errmsg) + } + }() + } + + wg.Wait() + return nil } // Connect to a nats server and subscribe to "updates". This is a blocking diff --git a/lineprotocol_test.go b/lineprotocol_test.go index 520003c..a6df786 100644 --- a/lineprotocol_test.go +++ b/lineprotocol_test.go @@ -1,7 +1,9 @@ package main import ( + "bytes" "log" + "strconv" "testing" "github.com/influxdata/line-protocol/v2/lineprotocol" @@ -15,6 +17,54 @@ m4,cluster=ctest,hostname=htest2,type=core,type-id=1 value=4 123456789 m4,cluster=ctest,hostname=htest2,type-id=2,type=core value=5 123456789 ` +const BenchmarkLineBatch string = ` +nm1,cluster=ctest,hostname=htest1,type=node value=123.0 123456789 +nm2,cluster=ctest,hostname=htest1,type=node value=123.0 123456789 +nm3,cluster=ctest,hostname=htest1,type=node value=123.0 123456789 +nm4,cluster=ctest,hostname=htest1,type=node value=123.0 123456789 +nm5,cluster=ctest,hostname=htest1,type=node value=123.0 123456789 +nm6,cluster=ctest,hostname=htest1,type=node value=123.0 123456789 +nm7,cluster=ctest,hostname=htest1,type=node value=123.0 123456789 +nm8,cluster=ctest,hostname=htest1,type=node value=123.0 123456789 +nm9,cluster=ctest,hostname=htest1,type=node value=123.0 123456789 +cm1,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789 +cm2,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789 +cm3,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789 +cm4,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789 +cm5,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789 +cm6,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789 +cm7,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789 +cm8,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789 +cm9,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789 +cm1,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789 +cm2,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789 +cm3,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789 +cm4,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789 +cm5,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789 +cm6,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789 +cm7,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789 +cm8,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789 +cm9,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789 +cm1,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789 +cm2,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789 +cm3,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789 +cm4,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789 +cm5,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789 +cm6,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789 +cm7,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789 +cm8,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789 +cm9,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789 +cm1,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789 +cm2,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789 +cm3,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789 +cm4,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789 +cm5,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789 +cm6,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789 +cm7,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789 +cm8,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789 +cm9,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789 +` + func TestLineprotocolDecoder(t *testing.T) { prevMemoryStore := memoryStore t.Cleanup(func() { @@ -56,3 +106,39 @@ func TestLineprotocolDecoder(t *testing.T) { log.Fatal() } } + +func BenchmarkLineprotocolDecoder(b *testing.B) { + b.StopTimer() + memoryStore = NewMemoryStore(map[string]MetricConfig{ + "nm1": {Frequency: 1}, + "nm2": {Frequency: 1}, + "nm3": {Frequency: 1}, + "nm4": {Frequency: 1}, + "nm5": {Frequency: 1}, + "nm6": {Frequency: 1}, + "nm7": {Frequency: 1}, + "nm8": {Frequency: 1}, + "nm9": {Frequency: 1}, + "cm1": {Frequency: 1}, + "cm2": {Frequency: 1}, + "cm3": {Frequency: 1}, + "cm4": {Frequency: 1}, + "cm5": {Frequency: 1}, + "cm6": {Frequency: 1}, + "cm7": {Frequency: 1}, + "cm8": {Frequency: 1}, + "cm9": {Frequency: 1}, + }) + + for i := 0; i < b.N; i++ { + data := []byte(BenchmarkLineBatch) + data = bytes.ReplaceAll(data, []byte("123456789"), []byte(strconv.Itoa(i+123456789))) + dec := lineprotocol.NewDecoderWithBytes(data) + + b.StartTimer() + if err := decodeLine(dec, "ctest"); err != nil { + b.Fatal(err) + } + b.StopTimer() + } +}