From e4c3bc4db1671bf18066b611d1678800cfefd598 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Wed, 8 Sep 2021 09:08:51 +0200 Subject: [PATCH] Use Golangs contextes --- api.go | 4 ++-- archive.go | 2 +- lineprotocol.go | 32 ++++++++++++++++++++++---------- metric-store.go | 15 ++++++++------- 4 files changed, 33 insertions(+), 20 deletions(-) diff --git a/api.go b/api.go index b9403ee..92314de 100644 --- a/api.go +++ b/api.go @@ -187,7 +187,7 @@ func handleFree(rw http.ResponseWriter, r *http.Request) { rw.Write([]byte(fmt.Sprintf("buffers freed: %d\n", n))) } -func StartApiServer(address string, done chan bool) error { +func StartApiServer(address string, ctx context.Context) error { r := mux.NewRouter() r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/timeseries", handleTimeseries) @@ -210,7 +210,7 @@ func StartApiServer(address string, done chan bool) error { }() for { - _ = <-done + _ = <-ctx.Done() err := server.Shutdown(context.Background()) log.Println("API server shut down") return err diff --git a/archive.go b/archive.go index 22ae86a..ee8860c 100644 --- a/archive.go +++ b/archive.go @@ -65,7 +65,7 @@ func (l *level) toArchiveFile(from, to int64) (*ArchiveFile, error) { } for metric, b := range l.metrics { - data := make([]Float, (to-from)/b.frequency) + data := make([]Float, (to-from)/b.frequency+1) data, start, end, err := b.read(from, to, data) if err != nil { return nil, err diff --git a/lineprotocol.go b/lineprotocol.go index b23330b..dc0cad6 100644 --- a/lineprotocol.go +++ b/lineprotocol.go @@ -2,6 +2,7 @@ package main import ( "bufio" + "context" "errors" "io" "log" @@ -175,7 +176,7 @@ func ReceiveTCP(address string, handleLine func(line *Line), done chan bool) err // Connect to a nats server and subscribe to "updates". This is a blocking // function. handleLine will be called for each line recieved via nats. // Send `true` through the done channel for gracefull termination. -func ReceiveNats(address string, handleLine func(line *Line), workers int, done chan bool) error { +func ReceiveNats(address string, handleLine func(line *Line), workers int, ctx context.Context) error { nc, err := nats.Connect(nats.DefaultURL) if err != nil { return err @@ -194,6 +195,14 @@ func ReceiveNats(address string, handleLine func(line *Line), workers int, done handleLine(line) }) + if err != nil { + return err + } + + log.Printf("NATS subscription to 'updates' on '%s' established\n", address) + + _ = <-ctx.Done() + err = sub.Unsubscribe() } else { msgs := make(chan *nats.Msg, 16) var wg sync.WaitGroup @@ -210,14 +219,22 @@ func ReceiveNats(address string, handleLine func(line *Line), workers int, done handleLine(line) } + + wg.Done() }() } sub, err = nc.Subscribe("updates", func(m *nats.Msg) { msgs <- m }) + if err != nil { + return err + } - _ = <-done + log.Printf("NATS subscription to 'updates' on '%s' established\n", address) + + _ = <-ctx.Done() + err = sub.Unsubscribe() close(msgs) wg.Wait() } @@ -226,12 +243,7 @@ func ReceiveNats(address string, handleLine func(line *Line), workers int, done return err } - log.Printf("NATS subscription to 'updates' on '%s' established\n", address) - for { - _ = <-done - sub.Unsubscribe() - nc.Close() - log.Println("NATS connection closed") - return nil - } + nc.Close() + log.Println("NATS connection closed") + return nil } diff --git a/metric-store.go b/metric-store.go index 7eb5397..1ffd2c4 100644 --- a/metric-store.go +++ b/metric-store.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/json" "fmt" "log" @@ -62,7 +63,7 @@ func handleLine(line *Line) { } ts := line.Ts.Unix() - log.Printf("ts=%d, tags=%v\n", ts, selector) + // log.Printf("ts=%d, tags=%v\n", ts, selector) err := memoryStore.Write(selector, ts, line.Fields) if err != nil { log.Printf("error: %s\n", err.Error()) @@ -87,15 +88,15 @@ func main() { } } + ctx, shutdown := context.WithCancel(context.Background()) + var wg sync.WaitGroup sigs := make(chan os.Signal, 1) - done := make(chan bool, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { _ = <-sigs log.Println("Shuting down...") - done <- true - close(done) + shutdown() }() lastCheckpoint = startupTime @@ -106,7 +107,7 @@ func main() { ticks := time.Tick(d) for { select { - case <-done: + case <-ctx.Done(): wg.Done() return case <-ticks: @@ -137,7 +138,7 @@ func main() { } go func() { - err := StartApiServer(":8080", done) + err := StartApiServer(":8080", ctx) if err != nil { log.Fatal(err) } @@ -145,7 +146,7 @@ func main() { }() go func() { - err := ReceiveNats(conf.Nats, handleLine, runtime.NumCPU()-1, done) + err := ReceiveNats(conf.Nats, handleLine, runtime.NumCPU()-1, ctx) if err != nil { log.Fatal(err) }