diff --git a/api.go b/api.go index ba6cf70..a397d0b 100644 --- a/api.go +++ b/api.go @@ -1,6 +1,7 @@ package main import ( + "bufio" "context" "crypto/ed25519" "encoding/base64" @@ -15,6 +16,7 @@ import ( "github.com/golang-jwt/jwt/v4" "github.com/gorilla/mux" + "github.com/influxdata/line-protocol/v2/lineprotocol" ) // Example: @@ -208,6 +210,22 @@ func handlePeek(rw http.ResponseWriter, r *http.Request) { } } +func handleWrite(rw http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + + reader := bufio.NewReader(r.Body) + dec := lineprotocol.NewDecoder(reader) + // Unlike the name suggests, handleLine can handle multiple lines + if err := handleLine(dec); err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + rw.WriteHeader(http.StatusOK) +} + func authentication(next http.Handler, publicKey ed25519.PublicKey) http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { authheader := r.Header.Get("Authorization") @@ -244,6 +262,7 @@ func StartApiServer(address string, ctx context.Context) error { r.HandleFunc("/api/{from:[0-9]+}/{to:[0-9]+}/stats", handleStats) r.HandleFunc("/api/{to:[0-9]+}/free", handleFree) r.HandleFunc("/api/{cluster}/peek", handlePeek) + r.HandleFunc("/api/write", handleWrite) server := &http.Server{ Handler: r, diff --git a/lineprotocol.go b/lineprotocol.go index 5fcdbee..1fbe6c2 100644 --- a/lineprotocol.go +++ b/lineprotocol.go @@ -1,11 +1,9 @@ package main import ( - "bufio" "context" "log" "math" - "net" "strconv" "sync" @@ -53,48 +51,10 @@ type Metric struct { Value Float } -// Listen for connections sending metric data in the line protocol format. -// -// This is a blocking function, send `true` through the channel argument to shut down the server. -// `handleLine` will be called from different go routines for different connections. -// -func ReceiveTCP(address string, handleLine func(dec *lineprotocol.Decoder), done chan bool) error { - ln, err := net.Listen("tcp", address) - if err != nil { - return err - } - - go func() { - for { - stop := <-done - if stop { - err := ln.Close() - if err != nil { - log.Printf("closing listener failed: %s\n", err.Error()) - } - return - } - } - }() - - for { - conn, err := ln.Accept() - if err != nil { - return err - } - - go func() { - reader := bufio.NewReader(conn) - dec := lineprotocol.NewDecoder(reader) - handleLine(dec) - }() - } -} - // Connect to a nats server and subscribe to "updates". This is a blocking // function. handleLine will be called for each line recieved via nats. // Send `true` through the done channel for gracefull termination. -func ReceiveNats(address string, handleLine func(dec *lineprotocol.Decoder), workers int, ctx context.Context) error { +func ReceiveNats(address string, handleLine func(dec *lineprotocol.Decoder) error, workers int, ctx context.Context) error { nc, err := nats.Connect(nats.DefaultURL) if err != nil { return err @@ -113,7 +73,9 @@ func ReceiveNats(address string, handleLine func(dec *lineprotocol.Decoder), wor go func() { for m := range msgs { dec := lineprotocol.NewDecoderWithBytes(m.Data) - handleLine(dec) + if err := handleLine(dec); err != nil { + log.Printf("error: %s\n", err.Error()) + } } wg.Done() @@ -126,7 +88,9 @@ func ReceiveNats(address string, handleLine func(dec *lineprotocol.Decoder), wor } else { sub, err = nc.Subscribe("updates", func(m *nats.Msg) { dec := lineprotocol.NewDecoderWithBytes(m.Data) - handleLine(dec) + if err := handleLine(dec); err != nil { + log.Printf("error: %s\n", err.Error()) + } }) } diff --git a/metric-store.go b/metric-store.go index 9236d4f..d983c05 100644 --- a/metric-store.go +++ b/metric-store.go @@ -52,18 +52,18 @@ func loadConfiguration(file string) Config { return config } -func handleLine(dec *lineprotocol.Decoder) { +func handleLine(dec *lineprotocol.Decoder) error { for dec.Next() { measurement, err := dec.Measurement() if err != nil { - log.Fatal(err) + return err } var cluster, host, typeName, typeId string for { key, val, err := dec.NextTag() if err != nil { - log.Fatal(err) + return err } if key == nil { break @@ -79,7 +79,7 @@ func handleLine(dec *lineprotocol.Decoder) { case "type-id": typeId = string(val) default: - log.Fatalf("Unkown tag: '%s' (value: '%s')", string(key), string(val)) + return fmt.Errorf("unkown tag: '%s' (value: '%s')", string(key), string(val)) } } @@ -94,7 +94,7 @@ func handleLine(dec *lineprotocol.Decoder) { for { key, val, err := dec.NextField() if err != nil { - log.Fatal(err) + return err } if key == nil { @@ -102,7 +102,7 @@ func handleLine(dec *lineprotocol.Decoder) { } if string(key) != "value" { - log.Fatalf("Unkown field: '%s' (value: %#v)", string(key), val) + return fmt.Errorf("unkown field: '%s' (value: %#v)", string(key), val) } if val.Kind() == lineprotocol.Float { @@ -110,21 +110,22 @@ func handleLine(dec *lineprotocol.Decoder) { } else if val.Kind() == lineprotocol.Int { value = Float(val.IntV()) } else { - log.Fatalf("Unsupported value type in message: %s", val.Kind().String()) + return fmt.Errorf("unsupported value type in message: %s", val.Kind().String()) } } t, err := dec.Time(lineprotocol.Second, time.Now()) if err != nil { - log.Fatal(err) + return err } if err := memoryStore.Write(selector, t.Unix(), []Metric{ {Name: string(measurement), Value: value}, }); err != nil { - log.Fatal(err) + return err } } + return nil } func intervals(wg *sync.WaitGroup, ctx context.Context) {