mirror of
https://github.com/ClusterCockpit/cc-metric-store.git
synced 2025-09-13 20:23:01 +02:00
Start a new api package
This commit is contained in:
326
internal/api/lineprotocol.go
Normal file
326
internal/api/lineprotocol.go
Normal file
@@ -0,0 +1,326 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/memstore"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/types"
|
||||
"github.com/influxdata/line-protocol/v2/lineprotocol"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
type NatsConfig struct {
|
||||
// Address of the nats server
|
||||
Address string `json:"address"`
|
||||
|
||||
// Username/Password, optional
|
||||
Username string `json:"username"`
|
||||
Password string `json:"password"`
|
||||
|
||||
Subscriptions []struct {
|
||||
// Channel name
|
||||
SubscribeTo string `json:"subscribe-to"`
|
||||
|
||||
// Allow lines without a cluster tag, use this as default, optional
|
||||
ClusterTag string `json:"cluster-tag"`
|
||||
} `json:"subscriptions"`
|
||||
}
|
||||
|
||||
// Currently unused, could be used to send messages via raw TCP.
|
||||
// Each connection is handled in it's own goroutine. This is a blocking function.
|
||||
func ReceiveRaw(ctx context.Context, listener net.Listener, handleLine func(*lineprotocol.Decoder, string) error) error {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
<-ctx.Done()
|
||||
if err := listener.Close(); err != nil {
|
||||
log.Printf("listener.Close(): %s", err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
conn, err := listener.Accept()
|
||||
if err != nil {
|
||||
if errors.Is(err, net.ErrClosed) {
|
||||
break
|
||||
}
|
||||
|
||||
log.Printf("listener.Accept(): %s", err.Error())
|
||||
}
|
||||
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer conn.Close()
|
||||
|
||||
dec := lineprotocol.NewDecoder(conn)
|
||||
connctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
select {
|
||||
case <-connctx.Done():
|
||||
conn.Close()
|
||||
case <-ctx.Done():
|
||||
conn.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
if err := handleLine(dec, "default"); err != nil {
|
||||
if errors.Is(err, net.ErrClosed) {
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("%s: %s", conn.RemoteAddr().String(), err.Error())
|
||||
errmsg := make([]byte, 128)
|
||||
errmsg = append(errmsg, `error: `...)
|
||||
errmsg = append(errmsg, err.Error()...)
|
||||
errmsg = append(errmsg, '\n')
|
||||
conn.Write(errmsg)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Connect to a nats server and subscribe to "updates". This is a blocking
|
||||
// function. handleLine will be called for each line recieved via nats.
|
||||
// Send `true` through the done channel for gracefull termination.
|
||||
func ReceiveNats(conf *NatsConfig, handleLine func(*lineprotocol.Decoder, string) error, workers int, ctx context.Context) error {
|
||||
var opts []nats.Option
|
||||
if conf.Username != "" && conf.Password != "" {
|
||||
opts = append(opts, nats.UserInfo(conf.Username, conf.Password))
|
||||
}
|
||||
|
||||
nc, err := nats.Connect(conf.Address, opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var subs []*nats.Subscription
|
||||
|
||||
msgs := make(chan *nats.Msg, workers*2)
|
||||
|
||||
for _, sc := range conf.Subscriptions {
|
||||
clusterTag := sc.ClusterTag
|
||||
var sub *nats.Subscription
|
||||
if workers > 1 {
|
||||
wg.Add(workers)
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go func() {
|
||||
for m := range msgs {
|
||||
dec := lineprotocol.NewDecoderWithBytes(m.Data)
|
||||
if err := handleLine(dec, clusterTag); err != nil {
|
||||
log.Printf("error: %s\n", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
sub, err = nc.Subscribe(sc.SubscribeTo, func(m *nats.Msg) {
|
||||
msgs <- m
|
||||
})
|
||||
} else {
|
||||
sub, err = nc.Subscribe(sc.SubscribeTo, func(m *nats.Msg) {
|
||||
dec := lineprotocol.NewDecoderWithBytes(m.Data)
|
||||
if err := handleLine(dec, clusterTag); err != nil {
|
||||
log.Printf("error: %s\n", err.Error())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Printf("NATS subscription to '%s' on '%s' established\n", sc.SubscribeTo, conf.Address)
|
||||
subs = append(subs, sub)
|
||||
}
|
||||
|
||||
<-ctx.Done()
|
||||
for _, sub := range subs {
|
||||
err = sub.Unsubscribe()
|
||||
if err != nil {
|
||||
log.Printf("NATS unsubscribe failed: %s", err.Error())
|
||||
}
|
||||
}
|
||||
close(msgs)
|
||||
wg.Wait()
|
||||
|
||||
nc.Close()
|
||||
log.Println("NATS connection closed")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Place `prefix` in front of `buf` but if possible,
|
||||
// do that inplace in `buf`.
|
||||
func reorder(buf, prefix []byte) []byte {
|
||||
n := len(prefix)
|
||||
m := len(buf)
|
||||
if cap(buf) < m+n {
|
||||
return append(prefix[:n:n], buf...)
|
||||
} else {
|
||||
buf = buf[:n+m]
|
||||
for i := m - 1; i >= 0; i-- {
|
||||
buf[i+n] = buf[i]
|
||||
}
|
||||
for i := 0; i < n; i++ {
|
||||
buf[i] = prefix[i]
|
||||
}
|
||||
return buf
|
||||
}
|
||||
}
|
||||
|
||||
// Decode lines using dec and make write calls to the MemoryStore.
|
||||
// If a line is missing its cluster tag, use clusterDefault as default.
|
||||
func decodeLine(memoryStore *memstore.MemoryStore, dec *lineprotocol.Decoder, clusterDefault string) error {
|
||||
// Reduce allocations in loop:
|
||||
t := time.Now()
|
||||
metric, metricBuf := types.Metric{}, make([]byte, 0, 16)
|
||||
selector := make([]string, 0, 4)
|
||||
typeBuf, subTypeBuf := make([]byte, 0, 16), make([]byte, 0)
|
||||
|
||||
// Optimize for the case where all lines in a "batch" are about the same
|
||||
// cluster and host. By using `WriteToLevel` (level = host), we do not need
|
||||
// to take the root- and cluster-level lock as often.
|
||||
var lvl *memstore.Level = nil
|
||||
var prevCluster, prevHost string = "", ""
|
||||
|
||||
var ok bool
|
||||
for dec.Next() {
|
||||
rawmeasurement, err := dec.Measurement()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Needs to be copied because another call to dec.* would
|
||||
// invalidate the returned slice.
|
||||
metricBuf = append(metricBuf[:0], rawmeasurement...)
|
||||
|
||||
// The go compiler optimizes map[string(byteslice)] lookups:
|
||||
metric.Conf, ok = memoryStore.GetMetricConf(string(rawmeasurement))
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
typeBuf, subTypeBuf := typeBuf[:0], subTypeBuf[:0]
|
||||
cluster, host := clusterDefault, ""
|
||||
for {
|
||||
key, val, err := dec.NextTag()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if key == nil {
|
||||
break
|
||||
}
|
||||
|
||||
// The go compiler optimizes string([]byte{...}) == "...":
|
||||
switch string(key) {
|
||||
case "cluster":
|
||||
if string(val) == prevCluster {
|
||||
cluster = prevCluster
|
||||
} else {
|
||||
cluster = string(val)
|
||||
lvl = nil
|
||||
}
|
||||
case "hostname", "host":
|
||||
if string(val) == prevHost {
|
||||
host = prevHost
|
||||
} else {
|
||||
host = string(val)
|
||||
lvl = nil
|
||||
}
|
||||
case "type":
|
||||
if string(val) == "node" {
|
||||
break
|
||||
}
|
||||
|
||||
// We cannot be sure that the "type" tag comes before the "type-id" tag:
|
||||
if len(typeBuf) == 0 {
|
||||
typeBuf = append(typeBuf, val...)
|
||||
} else {
|
||||
typeBuf = reorder(typeBuf, val)
|
||||
}
|
||||
case "type-id":
|
||||
typeBuf = append(typeBuf, val...)
|
||||
case "subtype":
|
||||
// We cannot be sure that the "subtype" tag comes before the "stype-id" tag:
|
||||
if len(subTypeBuf) == 0 {
|
||||
subTypeBuf = append(subTypeBuf, val...)
|
||||
} else {
|
||||
subTypeBuf = reorder(typeBuf, val)
|
||||
}
|
||||
case "stype-id":
|
||||
subTypeBuf = append(subTypeBuf, val...)
|
||||
default:
|
||||
// Ignore unkown tags (cc-metric-collector might send us a unit for example that we do not need)
|
||||
// return fmt.Errorf("unkown tag: '%s' (value: '%s')", string(key), string(val))
|
||||
}
|
||||
}
|
||||
|
||||
// If the cluster or host changed, the lvl was set to nil
|
||||
if lvl == nil {
|
||||
selector = selector[:2]
|
||||
selector[0], selector[1] = cluster, host
|
||||
lvl = memoryStore.GetLevel(selector)
|
||||
prevCluster, prevHost = cluster, host
|
||||
}
|
||||
|
||||
// subtypes:
|
||||
selector = selector[:0]
|
||||
if len(typeBuf) > 0 {
|
||||
selector = append(selector, string(typeBuf)) // <- Allocation :(
|
||||
if len(subTypeBuf) > 0 {
|
||||
selector = append(selector, string(subTypeBuf))
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
key, val, err := dec.NextField()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if key == nil {
|
||||
break
|
||||
}
|
||||
|
||||
if string(key) != "value" {
|
||||
return fmt.Errorf("unkown field: '%s' (value: %#v)", string(key), val)
|
||||
}
|
||||
|
||||
if val.Kind() == lineprotocol.Float {
|
||||
metric.Value = types.Float(val.FloatV())
|
||||
} else if val.Kind() == lineprotocol.Int {
|
||||
metric.Value = types.Float(val.IntV())
|
||||
} else if val.Kind() == lineprotocol.Uint {
|
||||
metric.Value = types.Float(val.UintV())
|
||||
} else {
|
||||
return fmt.Errorf("unsupported value type in message: %s", val.Kind().String())
|
||||
}
|
||||
}
|
||||
|
||||
if t, err = dec.Time(lineprotocol.Second, t); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := memoryStore.WriteToLevel(lvl, selector, t.Unix(), []types.Metric{metric}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
147
internal/api/lineprotocol_test.go
Normal file
147
internal/api/lineprotocol_test.go
Normal file
@@ -0,0 +1,147 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/memstore"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/types"
|
||||
"github.com/influxdata/line-protocol/v2/lineprotocol"
|
||||
)
|
||||
|
||||
const TestDataClassicFormat string = `
|
||||
m1,cluster=ctest,hostname=htest1,type=node value=1 123456789
|
||||
m2,cluster=ctest,hostname=htest1,type=node value=2 123456789
|
||||
m3,hostname=htest2,type=node value=3 123456789
|
||||
m4,cluster=ctest,hostname=htest2,type=core,type-id=1 value=4 123456789
|
||||
m4,cluster=ctest,hostname=htest2,type-id=2,type=core value=5 123456789
|
||||
`
|
||||
|
||||
const BenchmarkLineBatch string = `
|
||||
nm1,cluster=ctest,hostname=htest1,type=node value=123.0 123456789
|
||||
nm2,cluster=ctest,hostname=htest1,type=node value=123.0 123456789
|
||||
nm3,cluster=ctest,hostname=htest1,type=node value=123.0 123456789
|
||||
nm4,cluster=ctest,hostname=htest1,type=node value=123.0 123456789
|
||||
nm5,cluster=ctest,hostname=htest1,type=node value=123.0 123456789
|
||||
nm6,cluster=ctest,hostname=htest1,type=node value=123.0 123456789
|
||||
nm7,cluster=ctest,hostname=htest1,type=node value=123.0 123456789
|
||||
nm8,cluster=ctest,hostname=htest1,type=node value=123.0 123456789
|
||||
nm9,cluster=ctest,hostname=htest1,type=node value=123.0 123456789
|
||||
cm1,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789
|
||||
cm2,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789
|
||||
cm3,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789
|
||||
cm4,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789
|
||||
cm5,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789
|
||||
cm6,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789
|
||||
cm7,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789
|
||||
cm8,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789
|
||||
cm9,cluster=ctest,hostname=htest1,type=core,type-id=1 value=234.0 123456789
|
||||
cm1,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789
|
||||
cm2,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789
|
||||
cm3,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789
|
||||
cm4,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789
|
||||
cm5,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789
|
||||
cm6,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789
|
||||
cm7,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789
|
||||
cm8,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789
|
||||
cm9,cluster=ctest,hostname=htest1,type=core,type-id=2 value=345.0 123456789
|
||||
cm1,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789
|
||||
cm2,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789
|
||||
cm3,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789
|
||||
cm4,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789
|
||||
cm5,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789
|
||||
cm6,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789
|
||||
cm7,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789
|
||||
cm8,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789
|
||||
cm9,cluster=ctest,hostname=htest1,type=core,type-id=3 value=456.0 123456789
|
||||
cm1,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789
|
||||
cm2,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789
|
||||
cm3,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789
|
||||
cm4,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789
|
||||
cm5,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789
|
||||
cm6,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789
|
||||
cm7,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789
|
||||
cm8,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789
|
||||
cm9,cluster=ctest,hostname=htest1,type=core,type-id=4 value=567.0 123456789
|
||||
`
|
||||
|
||||
/*
|
||||
func TestLineprotocolDecoder(t *testing.T) {
|
||||
prevMemoryStore := memoryStore
|
||||
t.Cleanup(func() {
|
||||
memoryStore = prevMemoryStore
|
||||
})
|
||||
|
||||
memoryStore = NewMemoryStore(map[string]MetricConfig{
|
||||
"m1": {Frequency: 1},
|
||||
"m2": {Frequency: 1},
|
||||
"m3": {Frequency: 1},
|
||||
"m4": {Frequency: 1},
|
||||
})
|
||||
|
||||
dec := lineprotocol.NewDecoderWithBytes([]byte(TestDataClassicFormat))
|
||||
if err := decodeLine(dec, "ctest"); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// memoryStore.DebugDump(bufio.NewWriter(os.Stderr))
|
||||
|
||||
h1 := memoryStore.GetLevel([]string{"ctest", "htest1"})
|
||||
h1b1 := h1.metrics[memoryStore.metrics["m1"].offset]
|
||||
h1b2 := h1.metrics[memoryStore.metrics["m2"].offset]
|
||||
if h1b1.data[0] != 1.0 || h1b2.data[0] != 2.0 {
|
||||
log.Fatal()
|
||||
}
|
||||
|
||||
h2 := memoryStore.GetLevel([]string{"ctest", "htest2"})
|
||||
h2b3 := h2.metrics[memoryStore.metrics["m3"].offset]
|
||||
if h2b3.data[0] != 3.0 {
|
||||
log.Fatal()
|
||||
}
|
||||
|
||||
h2c1 := memoryStore.GetLevel([]string{"ctest", "htest2", "core1"})
|
||||
h2c1b4 := h2c1.metrics[memoryStore.metrics["m4"].offset]
|
||||
h2c2 := memoryStore.GetLevel([]string{"ctest", "htest2", "core2"})
|
||||
h2c2b4 := h2c2.metrics[memoryStore.metrics["m4"].offset]
|
||||
if h2c1b4.data[0] != 4.0 || h2c2b4.data[0] != 5.0 {
|
||||
log.Fatal()
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
func BenchmarkLineprotocolDecoder(b *testing.B) {
|
||||
b.StopTimer()
|
||||
memoryStore := memstore.NewMemoryStore(map[string]types.MetricConfig{
|
||||
"nm1": {Frequency: 1},
|
||||
"nm2": {Frequency: 1},
|
||||
"nm3": {Frequency: 1},
|
||||
"nm4": {Frequency: 1},
|
||||
"nm5": {Frequency: 1},
|
||||
"nm6": {Frequency: 1},
|
||||
"nm7": {Frequency: 1},
|
||||
"nm8": {Frequency: 1},
|
||||
"nm9": {Frequency: 1},
|
||||
"cm1": {Frequency: 1},
|
||||
"cm2": {Frequency: 1},
|
||||
"cm3": {Frequency: 1},
|
||||
"cm4": {Frequency: 1},
|
||||
"cm5": {Frequency: 1},
|
||||
"cm6": {Frequency: 1},
|
||||
"cm7": {Frequency: 1},
|
||||
"cm8": {Frequency: 1},
|
||||
"cm9": {Frequency: 1},
|
||||
})
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
data := []byte(BenchmarkLineBatch)
|
||||
data = bytes.ReplaceAll(data, []byte("123456789"), []byte(strconv.Itoa(i+123456789)))
|
||||
dec := lineprotocol.NewDecoderWithBytes(data)
|
||||
|
||||
b.StartTimer()
|
||||
if err := decodeLine(memoryStore, dec, "ctest"); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
b.StopTimer()
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user