allow setting the cluster tag via query parameter

This commit is contained in:
Lou Knauer 2022-02-22 13:58:10 +01:00
parent 182a1fa67d
commit d8e25063f9
5 changed files with 18 additions and 8 deletions

2
api.go
View File

@ -136,7 +136,7 @@ func handleWrite(rw http.ResponseWriter, r *http.Request) {
}
dec := lineprotocol.NewDecoderWithBytes(bytes)
if err := decodeLine(dec); err != nil {
if err := decodeLine(dec, r.URL.Query().Get("cluster")); err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}

View File

@ -20,7 +20,7 @@ type Metric struct {
// Connect to a nats server and subscribe to "updates". This is a blocking
// function. handleLine will be called for each line recieved via nats.
// Send `true` through the done channel for gracefull termination.
func ReceiveNats(conf *NatsConfig, handleLine func(dec *lineprotocol.Decoder) error, workers int, ctx context.Context) error {
func ReceiveNats(conf *NatsConfig, handleLine func(*lineprotocol.Decoder, string) error, workers int, ctx context.Context) error {
var opts []nats.Option
if conf.Username != "" && conf.Password != "" {
opts = append(opts, nats.UserInfo(conf.Username, conf.Password))
@ -44,7 +44,7 @@ func ReceiveNats(conf *NatsConfig, handleLine func(dec *lineprotocol.Decoder) er
go func() {
for m := range msgs {
dec := lineprotocol.NewDecoderWithBytes(m.Data)
if err := handleLine(dec); err != nil {
if err := handleLine(dec, conf.ClusterTag); err != nil {
log.Printf("error: %s\n", err.Error())
}
}
@ -59,7 +59,7 @@ func ReceiveNats(conf *NatsConfig, handleLine func(dec *lineprotocol.Decoder) er
} else {
sub, err = nc.Subscribe(conf.SubscribeTo, func(m *nats.Msg) {
dec := lineprotocol.NewDecoderWithBytes(m.Data)
if err := handleLine(dec); err != nil {
if err := handleLine(dec, conf.ClusterTag); err != nil {
log.Printf("error: %s\n", err.Error())
}
})
@ -104,7 +104,9 @@ func reorder(buf, prefix []byte) []byte {
}
}
func decodeLine(dec *lineprotocol.Decoder) error {
// Decode lines using dec and make write calls to the MemoryStore.
// If a line is missing its cluster tag, use clusterDefault as default.
func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error {
// Reduce allocations in loop:
t := time.Now()
metrics := make([]Metric, 0, 10)
@ -138,7 +140,7 @@ func decodeLine(dec *lineprotocol.Decoder) error {
}
typeBuf, subTypeBuf := typeBuf[:0], subTypeBuf[:0]
var cluster, host string
cluster, host := clusterDefault, ""
for {
key, val, err := dec.NextTag()
if err != nil {

View File

@ -10,7 +10,7 @@ import (
const TestDataClassicFormat string = `
m1,cluster=ctest,hostname=htest1,type=node value=1 123456789
m2,cluster=ctest,hostname=htest1,type=node value=2 123456789
m3,cluster=ctest,hostname=htest2,type=node value=3 123456789
m3,hostname=htest2,type=node value=3 123456789
m4,cluster=ctest,hostname=htest2,type=core,type-id=1 value=4 123456789
m4,cluster=ctest,hostname=htest2,type-id=2,type=core value=5 123456789
`
@ -29,7 +29,7 @@ func TestLineprotocolDecoder(t *testing.T) {
})
dec := lineprotocol.NewDecoderWithBytes([]byte(TestDataClassicFormat))
if err := decodeLine(dec); err != nil {
if err := decodeLine(dec, "ctest"); err != nil {
log.Fatal(err)
}

View File

@ -41,6 +41,9 @@ type NatsConfig struct {
// Channel name
SubscribeTo string `json:"subscribe-to"`
// Allow lines without a cluster tag, use this as default, optional
ClusterTag string `json:"cluster-tag"`
// Username/Password, optional
Username string `json:"username"`
Password string `json:"password"`

View File

@ -10,6 +10,11 @@ paths:
post:
operationId: 'writeMetrics'
description: 'Recieves metrics in the influx line-protocol using [this format](https://github.com/ClusterCockpit/cc-specifications/blob/master/metrics/lineprotocol_alternative.md)'
parameters:
- name: cluster
in: query
schema: { type: string }
description: "If the lines in the body do not have a cluster tag, use this value instead."
requestBody:
required: true
content: