From d8e25063f9377561b1e0a7ea9379812b29fffe75 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Tue, 22 Feb 2022 13:58:10 +0100 Subject: [PATCH] allow setting the cluster tag via query parameter --- api.go | 2 +- lineprotocol.go | 12 +++++++----- lineprotocol_test.go | 4 ++-- metric-store.go | 3 +++ openapi.yaml | 5 +++++ 5 files changed, 18 insertions(+), 8 deletions(-) diff --git a/api.go b/api.go index ab294b4..66fa0ff 100644 --- a/api.go +++ b/api.go @@ -136,7 +136,7 @@ func handleWrite(rw http.ResponseWriter, r *http.Request) { } dec := lineprotocol.NewDecoderWithBytes(bytes) - if err := decodeLine(dec); err != nil { + if err := decodeLine(dec, r.URL.Query().Get("cluster")); err != nil { http.Error(rw, err.Error(), http.StatusBadRequest) return } diff --git a/lineprotocol.go b/lineprotocol.go index a1829f1..d8b4af5 100644 --- a/lineprotocol.go +++ b/lineprotocol.go @@ -20,7 +20,7 @@ type Metric struct { // Connect to a nats server and subscribe to "updates". This is a blocking // function. handleLine will be called for each line recieved via nats. // Send `true` through the done channel for gracefull termination. -func ReceiveNats(conf *NatsConfig, handleLine func(dec *lineprotocol.Decoder) error, workers int, ctx context.Context) error { +func ReceiveNats(conf *NatsConfig, handleLine func(*lineprotocol.Decoder, string) error, workers int, ctx context.Context) error { var opts []nats.Option if conf.Username != "" && conf.Password != "" { opts = append(opts, nats.UserInfo(conf.Username, conf.Password)) @@ -44,7 +44,7 @@ func ReceiveNats(conf *NatsConfig, handleLine func(dec *lineprotocol.Decoder) er go func() { for m := range msgs { dec := lineprotocol.NewDecoderWithBytes(m.Data) - if err := handleLine(dec); err != nil { + if err := handleLine(dec, conf.ClusterTag); err != nil { log.Printf("error: %s\n", err.Error()) } } @@ -59,7 +59,7 @@ func ReceiveNats(conf *NatsConfig, handleLine func(dec *lineprotocol.Decoder) er } else { sub, err = nc.Subscribe(conf.SubscribeTo, func(m *nats.Msg) { dec := lineprotocol.NewDecoderWithBytes(m.Data) - if err := handleLine(dec); err != nil { + if err := handleLine(dec, conf.ClusterTag); err != nil { log.Printf("error: %s\n", err.Error()) } }) @@ -104,7 +104,9 @@ func reorder(buf, prefix []byte) []byte { } } -func decodeLine(dec *lineprotocol.Decoder) error { +// Decode lines using dec and make write calls to the MemoryStore. +// If a line is missing its cluster tag, use clusterDefault as default. +func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error { // Reduce allocations in loop: t := time.Now() metrics := make([]Metric, 0, 10) @@ -138,7 +140,7 @@ func decodeLine(dec *lineprotocol.Decoder) error { } typeBuf, subTypeBuf := typeBuf[:0], subTypeBuf[:0] - var cluster, host string + cluster, host := clusterDefault, "" for { key, val, err := dec.NextTag() if err != nil { diff --git a/lineprotocol_test.go b/lineprotocol_test.go index 83efe84..520003c 100644 --- a/lineprotocol_test.go +++ b/lineprotocol_test.go @@ -10,7 +10,7 @@ import ( const TestDataClassicFormat string = ` m1,cluster=ctest,hostname=htest1,type=node value=1 123456789 m2,cluster=ctest,hostname=htest1,type=node value=2 123456789 -m3,cluster=ctest,hostname=htest2,type=node value=3 123456789 +m3,hostname=htest2,type=node value=3 123456789 m4,cluster=ctest,hostname=htest2,type=core,type-id=1 value=4 123456789 m4,cluster=ctest,hostname=htest2,type-id=2,type=core value=5 123456789 ` @@ -29,7 +29,7 @@ func TestLineprotocolDecoder(t *testing.T) { }) dec := lineprotocol.NewDecoderWithBytes([]byte(TestDataClassicFormat)) - if err := decodeLine(dec); err != nil { + if err := decodeLine(dec, "ctest"); err != nil { log.Fatal(err) } diff --git a/metric-store.go b/metric-store.go index ecec370..28f3e9a 100644 --- a/metric-store.go +++ b/metric-store.go @@ -41,6 +41,9 @@ type NatsConfig struct { // Channel name SubscribeTo string `json:"subscribe-to"` + // Allow lines without a cluster tag, use this as default, optional + ClusterTag string `json:"cluster-tag"` + // Username/Password, optional Username string `json:"username"` Password string `json:"password"` diff --git a/openapi.yaml b/openapi.yaml index e3d31f6..02b2e71 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -10,6 +10,11 @@ paths: post: operationId: 'writeMetrics' description: 'Recieves metrics in the influx line-protocol using [this format](https://github.com/ClusterCockpit/cc-specifications/blob/master/metrics/lineprotocol_alternative.md)' + parameters: + - name: cluster + in: query + schema: { type: string } + description: "If the lines in the body do not have a cluster tag, use this value instead." requestBody: required: true content: