Copy all byte slices from decoder

While trying to optimize things I overread this: "the byte slices returned by the Decoder methods are only valid until the next call to any other Decode method."
This commit is contained in:
Lou Knauer 2022-01-24 09:50:12 +01:00
parent bf7c33513b
commit 4a78a24034
3 changed files with 162 additions and 54 deletions

View File

@ -50,6 +50,7 @@ func (f *Float) UnmarshalJSON(input []byte) error {
type Metric struct { type Metric struct {
Name string Name string
minfo metricInfo
Value Float Value Float
} }
@ -116,6 +117,25 @@ func ReceiveNats(address string, handleLine func(dec *lineprotocol.Decoder) erro
return nil return nil
} }
// Place `prefix` in front of `buf` but if possible,
// do that inplace in `buf`.
func reorder(buf, prefix []byte) []byte {
n := len(prefix)
m := len(buf)
if cap(buf) < m+n {
return append(prefix[:n:n], buf...)
} else {
buf = buf[:n+m]
for i := m - 1; i >= 0; i-- {
buf[i+n] = buf[i]
}
for i := 0; i < n; i++ {
buf[i] = prefix[i]
}
return buf
}
}
func decodeLine(dec *lineprotocol.Decoder) error { func decodeLine(dec *lineprotocol.Decoder) error {
// Reduce allocations in loop: // Reduce allocations in loop:
t := time.Now() t := time.Now()
@ -126,17 +146,31 @@ func decodeLine(dec *lineprotocol.Decoder) error {
// Optimize for the case where all lines in a "batch" are about the same // Optimize for the case where all lines in a "batch" are about the same
// cluster and host. By using `WriteToLevel` (level = host), we do not need // cluster and host. By using `WriteToLevel` (level = host), we do not need
// to take the root- and cluster-level lock as often. // to take the root- and cluster-level lock as often.
var hostLevel *level = nil var lvl *level = nil
var prevCluster, prevHost string = "", "" var prevCluster, prevHost string = "", ""
for dec.Next() { for dec.Next() {
metrics = metrics[:0]
rawmeasurement, err := dec.Measurement() rawmeasurement, err := dec.Measurement()
if err != nil { if err != nil {
return err return err
} }
// A more dense lp format if supported if the measurement is 'data'.
// In that case, the field keys are used as metric names.
if string(rawmeasurement) != "data" {
minfo, ok := memoryStore.metrics[string(rawmeasurement)]
if !ok {
continue
}
metrics = append(metrics, Metric{
minfo: minfo,
})
}
typeBuf, subTypeBuf := typeBuf[:0], subTypeBuf[:0]
var cluster, host string var cluster, host string
var typeName, typeId, subType, subTypeId []byte
for { for {
key, val, err := dec.NextTag() key, val, err := dec.NextTag()
if err != nil { if err != nil {
@ -153,54 +187,57 @@ func decodeLine(dec *lineprotocol.Decoder) error {
cluster = prevCluster cluster = prevCluster
} else { } else {
cluster = string(val) cluster = string(val)
lvl = nil
} }
case "hostname": case "hostname", "host":
if string(val) == prevHost { if string(val) == prevHost {
host = prevHost host = prevHost
} else { } else {
host = string(val) host = string(val)
lvl = nil
} }
case "type": case "type":
typeName = val if string(val) == "node" {
break
}
if len(typeBuf) == 0 {
typeBuf = append(typeBuf, val...)
} else {
typeBuf = reorder(typeBuf, val)
}
case "type-id": case "type-id":
typeId = val typeBuf = append(typeBuf, val...)
case "subtype": case "subtype":
subType = val if len(subTypeBuf) == 0 {
subTypeBuf = append(subTypeBuf, val...)
} else {
subTypeBuf = reorder(typeBuf, val)
}
case "stype-id": case "stype-id":
subTypeId = val subTypeBuf = append(subTypeBuf, val...)
default: default:
// Ignore unkown tags (cc-metric-collector might send us a unit for example that we do not need) // Ignore unkown tags (cc-metric-collector might send us a unit for example that we do not need)
// return fmt.Errorf("unkown tag: '%s' (value: '%s')", string(key), string(val)) // return fmt.Errorf("unkown tag: '%s' (value: '%s')", string(key), string(val))
} }
} }
if hostLevel == nil || prevCluster != cluster || prevHost != host { if lvl == nil {
prevCluster = cluster
prevHost = host
selector = selector[:2] selector = selector[:2]
selector[0] = cluster selector[0], selector[1] = cluster, host
selector[1] = host lvl = memoryStore.GetLevel(selector)
hostLevel = memoryStore.root.findLevelOrCreate(selector, len(memoryStore.metrics)) prevCluster, prevHost = cluster, host
} }
selector = selector[:0] selector = selector[:0]
if len(typeId) > 0 { if len(typeBuf) > 0 {
typeBuf = typeBuf[:0]
typeBuf = append(typeBuf, typeName...)
typeBuf = append(typeBuf, typeId...)
selector = append(selector, string(typeBuf)) // <- Allocation :( selector = append(selector, string(typeBuf)) // <- Allocation :(
if len(subTypeId) > 0 { if len(subTypeBuf) > 0 {
subTypeBuf = subTypeBuf[:0]
subTypeBuf = append(subTypeBuf, subType...)
subTypeBuf = append(subTypeBuf, subTypeId...)
selector = append(selector, string(subTypeBuf)) selector = append(selector, string(subTypeBuf))
} }
} }
metrics = metrics[:0] if len(metrics) == 0 {
// A more dense lp format if supported if the measurement is 'data'.
// In that case, the field keys are used as metric names.
if string(rawmeasurement) == "data" {
for { for {
key, val, err := dec.NextField() key, val, err := dec.NextField()
if err != nil { if err != nil {
@ -220,13 +257,17 @@ func decodeLine(dec *lineprotocol.Decoder) error {
return fmt.Errorf("unsupported value type in message: %s", val.Kind().String()) return fmt.Errorf("unsupported value type in message: %s", val.Kind().String())
} }
minfo, ok := memoryStore.metrics[string(key)]
if !ok {
continue
}
metrics = append(metrics, Metric{ metrics = append(metrics, Metric{
Name: string(key), // <- Allocation :( minfo: minfo,
Value: value, Value: value,
}) })
} }
} else { } else {
measurement := string(rawmeasurement) // <- Allocation :(
var value Float var value Float
for { for {
key, val, err := dec.NextField() key, val, err := dec.NextField()
@ -251,10 +292,7 @@ func decodeLine(dec *lineprotocol.Decoder) error {
} }
} }
metrics = append(metrics, Metric{ metrics[0].Value = value
Name: measurement,
Value: value,
})
} }
t, err = dec.Time(lineprotocol.Second, t) t, err = dec.Time(lineprotocol.Second, t)
@ -263,7 +301,7 @@ func decodeLine(dec *lineprotocol.Decoder) error {
} }
// log.Printf("write: %s (%v) -> %v\n", string(measurement), selector, value) // log.Printf("write: %s (%v) -> %v\n", string(measurement), selector, value)
if err := memoryStore.WriteToLevel(hostLevel, selector, t.Unix(), metrics); err != nil { if err := memoryStore.WriteToLevel(lvl, selector, t.Unix(), metrics); err != nil {
return err return err
} }
} }

58
lineprotocol_test.go Normal file
View File

@ -0,0 +1,58 @@
package main
import (
"log"
"testing"
"github.com/influxdata/line-protocol/v2/lineprotocol"
)
const TestDataClassicFormat string = `
m1,cluster=ctest,hostname=htest1,type=node value=1 123456789
m2,cluster=ctest,hostname=htest1,type=node value=2 123456789
m3,cluster=ctest,hostname=htest2,type=node value=3 123456789
m4,cluster=ctest,hostname=htest2,type=core,type-id=1 value=4 123456789
m4,cluster=ctest,hostname=htest2,type-id=2,type=core value=5 123456789
`
func TestLineprotocolDecoder(t *testing.T) {
prevMemoryStore := memoryStore
t.Cleanup(func() {
memoryStore = prevMemoryStore
})
memoryStore = NewMemoryStore(map[string]MetricConfig{
"m1": {Frequency: 1},
"m2": {Frequency: 1},
"m3": {Frequency: 1},
"m4": {Frequency: 1},
})
dec := lineprotocol.NewDecoderWithBytes([]byte(TestDataClassicFormat))
if err := decodeLine(dec); err != nil {
log.Fatal(err)
}
// memoryStore.DebugDump(bufio.NewWriter(os.Stderr))
h1 := memoryStore.GetLevel([]string{"ctest", "htest1"})
h1b1 := h1.metrics[memoryStore.metrics["m1"].offset]
h1b2 := h1.metrics[memoryStore.metrics["m2"].offset]
if h1b1.data[0] != 1.0 || h1b2.data[0] != 2.0 {
log.Fatal()
}
h2 := memoryStore.GetLevel([]string{"ctest", "htest2"})
h2b3 := h2.metrics[memoryStore.metrics["m3"].offset]
if h2b3.data[0] != 3.0 {
log.Fatal()
}
h2c1 := memoryStore.GetLevel([]string{"ctest", "htest2", "core1"})
h2c1b4 := h2c1.metrics[memoryStore.metrics["m4"].offset]
h2c2 := memoryStore.GetLevel([]string{"ctest", "htest2", "core2"})
h2c2b4 := h2c2.metrics[memoryStore.metrics["m4"].offset]
if h2c1b4.data[0] != 4.0 || h2c2b4.data[0] != 5.0 {
log.Fatal()
}
}

View File

@ -313,23 +313,21 @@ const (
AvgAggregation AvgAggregation
) )
type metricInfo struct {
offset int
aggregation AggregationStrategy
frequency int64
}
type MemoryStore struct { type MemoryStore struct {
root level // root of the tree structure root level // root of the tree structure
metrics map[string]struct { metrics map[string]metricInfo
offset int
aggregation AggregationStrategy
frequency int64
}
} }
// Return a new, initialized instance of a MemoryStore. // Return a new, initialized instance of a MemoryStore.
// Will panic if values in the metric configurations are invalid. // Will panic if values in the metric configurations are invalid.
func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore { func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore {
ms := make(map[string]struct { ms := make(map[string]metricInfo)
offset int
aggregation AggregationStrategy
frequency int64
})
offset := 0 offset := 0
for key, config := range metrics { for key, config := range metrics {
@ -342,11 +340,11 @@ func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore {
panic("invalid aggregation strategy: " + config.Aggregation) panic("invalid aggregation strategy: " + config.Aggregation)
} }
ms[key] = struct { if config.Frequency == 0 {
offset int panic("invalid frequency")
aggregation AggregationStrategy }
frequency int64
}{ ms[key] = metricInfo{
offset: offset, offset: offset,
aggregation: aggregation, aggregation: aggregation,
frequency: config.Frequency, frequency: config.Frequency,
@ -367,26 +365,40 @@ func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore {
// Write all values in `metrics` to the level specified by `selector` for time `ts`. // Write all values in `metrics` to the level specified by `selector` for time `ts`.
// Look at `findLevelOrCreate` for how selectors work. // Look at `findLevelOrCreate` for how selectors work.
func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error { func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error {
var ok bool
for i, metric := range metrics {
if metric.minfo.frequency == 0 {
metric.minfo, ok = m.metrics[metric.Name]
if !ok {
metric.minfo.frequency = 0
}
metrics[i] = metric
}
}
return m.WriteToLevel(&m.root, selector, ts, metrics) return m.WriteToLevel(&m.root, selector, ts, metrics)
} }
func (m *MemoryStore) GetLevel(selector []string) *level {
return m.root.findLevelOrCreate(selector, len(m.metrics))
}
// Assumes that `minfo` in `metrics` is filled in!
func (m *MemoryStore) WriteToLevel(l *level, selector []string, ts int64, metrics []Metric) error { func (m *MemoryStore) WriteToLevel(l *level, selector []string, ts int64, metrics []Metric) error {
l = l.findLevelOrCreate(selector, len(m.metrics)) l = l.findLevelOrCreate(selector, len(m.metrics))
l.lock.Lock() l.lock.Lock()
defer l.lock.Unlock() defer l.lock.Unlock()
for _, metric := range metrics { for _, metric := range metrics {
minfo, ok := m.metrics[metric.Name] if metric.minfo.frequency == 0 {
if !ok {
// return errors.New("Unknown metric: " + metric.Name)
continue continue
} }
b := l.metrics[minfo.offset] b := l.metrics[metric.minfo.offset]
if b == nil { if b == nil {
// First write to this metric and level // First write to this metric and level
b = newBuffer(ts, minfo.frequency) b = newBuffer(ts, metric.minfo.frequency)
l.metrics[minfo.offset] = b l.metrics[metric.minfo.offset] = b
} }
nb, err := b.write(ts, metric.Value) nb, err := b.write(ts, metric.Value)
@ -396,7 +408,7 @@ func (m *MemoryStore) WriteToLevel(l *level, selector []string, ts int64, metric
// Last write created a new buffer... // Last write created a new buffer...
if b != nb { if b != nb {
l.metrics[minfo.offset] = nb l.metrics[metric.minfo.offset] = nb
} }
} }
return nil return nil