Continue restructuring. Intermediate state.

This commit is contained in:
2024-05-06 09:27:28 +02:00
parent e1e6694656
commit b2528f958c
13 changed files with 795 additions and 751 deletions

View File

@@ -10,21 +10,17 @@ import (
"time"
"github.com/ClusterCockpit/cc-metric-store/internal/config"
"github.com/ClusterCockpit/cc-metric-store/internal/memstore"
"github.com/ClusterCockpit/cc-metric-store/internal/memorystore"
"github.com/ClusterCockpit/cc-metric-store/internal/util"
"github.com/influxdata/line-protocol/v2/lineprotocol"
"github.com/nats-io/nats.go"
)
type Metric struct {
Name string
Value util.Float
mc config.MetricConfig
}
// Currently unused, could be used to send messages via raw TCP.
// Each connection is handled in it's own goroutine. This is a blocking function.
func ReceiveRaw(ctx context.Context, listener net.Listener, handleLine func(*lineprotocol.Decoder, string) error) error {
func ReceiveRaw(ctx context.Context,
listener net.Listener,
handleLine func(*lineprotocol.Decoder, string) error,
) error {
var wg sync.WaitGroup
wg.Add(1)
@@ -86,7 +82,11 @@ func ReceiveRaw(ctx context.Context, listener net.Listener, handleLine func(*lin
// Connect to a nats server and subscribe to "updates". This is a blocking
// function. handleLine will be called for each line recieved via nats.
// Send `true` through the done channel for gracefull termination.
func ReceiveNats(conf *config.NatsConfig, handleLine func(*lineprotocol.Decoder, string) error, workers int, ctx context.Context) error {
func ReceiveNats(conf *config.NatsConfig,
ms *memorystore.MemoryStore,
workers int,
ctx context.Context,
) error {
var opts []nats.Option
if conf.Username != "" && conf.Password != "" {
opts = append(opts, nats.UserInfo(conf.Username, conf.Password))
@@ -113,7 +113,7 @@ func ReceiveNats(conf *config.NatsConfig, handleLine func(*lineprotocol.Decoder,
go func() {
for m := range msgs {
dec := lineprotocol.NewDecoderWithBytes(m.Data)
if err := handleLine(dec, clusterTag); err != nil {
if err := decodeLine(dec, ms, clusterTag); err != nil {
log.Printf("error: %s\n", err.Error())
}
}
@@ -128,7 +128,7 @@ func ReceiveNats(conf *config.NatsConfig, handleLine func(*lineprotocol.Decoder,
} else {
sub, err = nc.Subscribe(sc.SubscribeTo, func(m *nats.Msg) {
dec := lineprotocol.NewDecoderWithBytes(m.Data)
if err := handleLine(dec, clusterTag); err != nil {
if err := decodeLine(dec, ms, clusterTag); err != nil {
log.Printf("error: %s\n", err.Error())
}
})
@@ -177,17 +177,20 @@ func reorder(buf, prefix []byte) []byte {
// Decode lines using dec and make write calls to the MemoryStore.
// If a line is missing its cluster tag, use clusterDefault as default.
func decodeLine(dec *lineprotocol.Decoder, memoryStore *memstore.MemoryStore, clusterDefault string) error {
func decodeLine(dec *lineprotocol.Decoder,
ms *memorystore.MemoryStore,
clusterDefault string,
) error {
// Reduce allocations in loop:
t := time.Now()
metric, metricBuf := Metric{}, make([]byte, 0, 16)
metric, metricBuf := memorystore.Metric{}, make([]byte, 0, 16)
selector := make([]string, 0, 4)
typeBuf, subTypeBuf := make([]byte, 0, 16), make([]byte, 0)
// Optimize for the case where all lines in a "batch" are about the same
// cluster and host. By using `WriteToLevel` (level = host), we do not need
// to take the root- and cluster-level lock as often.
var lvl *level = nil
var lvl *memorystore.Level = nil
var prevCluster, prevHost string = "", ""
var ok bool
@@ -202,7 +205,7 @@ func decodeLine(dec *lineprotocol.Decoder, memoryStore *memstore.MemoryStore, cl
metricBuf = append(metricBuf[:0], rawmeasurement...)
// The go compiler optimizes map[string(byteslice)] lookups:
metric.mc, ok = memoryStore.metrics[string(rawmeasurement)]
metric.MetricConfig, ok = ms.Metrics[string(rawmeasurement)]
if !ok {
continue
}
@@ -266,7 +269,7 @@ func decodeLine(dec *lineprotocol.Decoder, memoryStore *memstore.MemoryStore, cl
if lvl == nil {
selector = selector[:2]
selector[0], selector[1] = cluster, host
lvl = memoryStore.GetLevel(selector)
lvl = ms.GetLevel(selector)
prevCluster, prevHost = cluster, host
}
@@ -308,7 +311,7 @@ func decodeLine(dec *lineprotocol.Decoder, memoryStore *memstore.MemoryStore, cl
return err
}
if err := memoryStore.WriteToLevel(lvl, selector, t.Unix(), []Metric{metric}); err != nil {
if err := ms.WriteToLevel(lvl, selector, t.Unix(), []memorystore.Metric{metric}); err != nil {
return err
}
}