Attempt to minimize heap allocations by using caching

This commit is contained in:
2026-02-25 07:52:02 +01:00
parent 3555fb6255
commit 9c0104a252

View File

@@ -6,6 +6,7 @@
package metricstore package metricstore
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"sync" "sync"
@@ -94,6 +95,28 @@ func reorder(buf, prefix []byte) []byte {
} }
} }
type decodeState struct {
metricBuf []byte
selector []string
typeBuf []byte
subTypeBuf []byte
prevTypeBytes []byte
prevTypeStr string
prevSubTypeBytes []byte
prevSubTypeStr string
}
var decodeStatePool = sync.Pool{
New: func() any {
return &decodeState{
metricBuf: make([]byte, 0, 16),
selector: make([]string, 0, 4),
typeBuf: make([]byte, 0, 16),
subTypeBuf: make([]byte, 0, 16),
}
},
}
// Decode lines using dec and make write calls to the MemoryStore. // Decode lines using dec and make write calls to the MemoryStore.
// If a line is missing its cluster tag, use clusterDefault as default. // If a line is missing its cluster tag, use clusterDefault as default.
func DecodeLine(dec *lineprotocol.Decoder, func DecodeLine(dec *lineprotocol.Decoder,
@@ -102,9 +125,9 @@ func DecodeLine(dec *lineprotocol.Decoder,
) error { ) error {
// Reduce allocations in loop: // Reduce allocations in loop:
t := time.Now() t := time.Now()
metric, metricBuf := Metric{}, make([]byte, 0, 16) metric := Metric{}
selector := make([]string, 0, 4) st := decodeStatePool.Get().(*decodeState)
typeBuf, subTypeBuf := make([]byte, 0, 16), make([]byte, 0) defer decodeStatePool.Put(st)
// Optimize for the case where all lines in a "batch" are about the same // Optimize for the case where all lines in a "batch" are about the same
// cluster and host. By using `WriteToLevel` (level = host), we do not need // cluster and host. By using `WriteToLevel` (level = host), we do not need
@@ -121,7 +144,7 @@ func DecodeLine(dec *lineprotocol.Decoder,
// Needs to be copied because another call to dec.* would // Needs to be copied because another call to dec.* would
// invalidate the returned slice. // invalidate the returned slice.
metricBuf = append(metricBuf[:0], rawmeasurement...) st.metricBuf = append(st.metricBuf[:0], rawmeasurement...)
// The go compiler optimizes map[string(byteslice)] lookups: // The go compiler optimizes map[string(byteslice)] lookups:
metric.MetricConfig, ok = ms.Metrics[string(rawmeasurement)] metric.MetricConfig, ok = ms.Metrics[string(rawmeasurement)]
@@ -129,7 +152,7 @@ func DecodeLine(dec *lineprotocol.Decoder,
continue continue
} }
typeBuf, subTypeBuf := typeBuf[:0], subTypeBuf[:0] st.typeBuf, st.subTypeBuf = st.typeBuf[:0], st.subTypeBuf[:0]
cluster, host := clusterDefault, "" cluster, host := clusterDefault, ""
for { for {
key, val, err := dec.NextTag() key, val, err := dec.NextTag()
@@ -162,41 +185,49 @@ func DecodeLine(dec *lineprotocol.Decoder,
} }
// We cannot be sure that the "type" tag comes before the "type-id" tag: // We cannot be sure that the "type" tag comes before the "type-id" tag:
if len(typeBuf) == 0 { if len(st.typeBuf) == 0 {
typeBuf = append(typeBuf, val...) st.typeBuf = append(st.typeBuf, val...)
} else { } else {
typeBuf = reorder(typeBuf, val) st.typeBuf = reorder(st.typeBuf, val)
} }
case "type-id": case "type-id":
typeBuf = append(typeBuf, val...) st.typeBuf = append(st.typeBuf, val...)
case "subtype": case "subtype":
// We cannot be sure that the "subtype" tag comes before the "stype-id" tag: // We cannot be sure that the "subtype" tag comes before the "stype-id" tag:
if len(subTypeBuf) == 0 { if len(st.subTypeBuf) == 0 {
subTypeBuf = append(subTypeBuf, val...) st.subTypeBuf = append(st.subTypeBuf, val...)
} else { } else {
subTypeBuf = reorder(subTypeBuf, val) st.subTypeBuf = reorder(st.subTypeBuf, val)
// subTypeBuf = reorder(typeBuf, val)
} }
case "stype-id": case "stype-id":
subTypeBuf = append(subTypeBuf, val...) st.subTypeBuf = append(st.subTypeBuf, val...)
default: default:
} }
} }
// If the cluster or host changed, the lvl was set to nil // If the cluster or host changed, the lvl was set to nil
if lvl == nil { if lvl == nil {
selector = selector[:2] st.selector = st.selector[:2]
selector[0], selector[1] = cluster, host st.selector[0], st.selector[1] = cluster, host
lvl = ms.GetLevel(selector) lvl = ms.GetLevel(st.selector)
prevCluster, prevHost = cluster, host prevCluster, prevHost = cluster, host
} }
// subtypes: // subtypes: cache []byte→string conversions; messages in a batch typically
selector = selector[:0] // share the same type/subtype so the hit rate is very high.
if len(typeBuf) > 0 { st.selector = st.selector[:0]
selector = append(selector, string(typeBuf)) // <- Allocation :( if len(st.typeBuf) > 0 {
if len(subTypeBuf) > 0 { if !bytes.Equal(st.typeBuf, st.prevTypeBytes) {
selector = append(selector, string(subTypeBuf)) st.prevTypeBytes = append(st.prevTypeBytes[:0], st.typeBuf...)
st.prevTypeStr = string(st.typeBuf)
}
st.selector = append(st.selector, st.prevTypeStr)
if len(st.subTypeBuf) > 0 {
if !bytes.Equal(st.subTypeBuf, st.prevSubTypeBytes) {
st.prevSubTypeBytes = append(st.prevSubTypeBytes[:0], st.subTypeBuf...)
st.prevSubTypeStr = string(st.subTypeBuf)
}
st.selector = append(st.selector, st.prevSubTypeStr)
} }
} }
@@ -244,7 +275,7 @@ func DecodeLine(dec *lineprotocol.Decoder,
time := t.Unix() time := t.Unix()
if err := ms.WriteToLevel(lvl, selector, time, []Metric{metric}); err != nil { if err := ms.WriteToLevel(lvl, st.selector, time, []Metric{metric}); err != nil {
return err return err
} }
} }