From 9c0104a2529d77f268808a2538f2540eb3f30e19 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 25 Feb 2026 07:52:02 +0100 Subject: [PATCH] Attempt to minimize heap allocations by using caching --- pkg/metricstore/lineprotocol.go | 79 +++++++++++++++++++++++---------- 1 file changed, 55 insertions(+), 24 deletions(-) diff --git a/pkg/metricstore/lineprotocol.go b/pkg/metricstore/lineprotocol.go index 67b6ae08..8e969aa7 100644 --- a/pkg/metricstore/lineprotocol.go +++ b/pkg/metricstore/lineprotocol.go @@ -6,6 +6,7 @@ package metricstore import ( + "bytes" "context" "fmt" "sync" @@ -94,6 +95,28 @@ func reorder(buf, prefix []byte) []byte { } } +type decodeState struct { + metricBuf []byte + selector []string + typeBuf []byte + subTypeBuf []byte + prevTypeBytes []byte + prevTypeStr string + prevSubTypeBytes []byte + prevSubTypeStr string +} + +var decodeStatePool = sync.Pool{ + New: func() any { + return &decodeState{ + metricBuf: make([]byte, 0, 16), + selector: make([]string, 0, 4), + typeBuf: make([]byte, 0, 16), + subTypeBuf: make([]byte, 0, 16), + } + }, +} + // Decode lines using dec and make write calls to the MemoryStore. // If a line is missing its cluster tag, use clusterDefault as default. func DecodeLine(dec *lineprotocol.Decoder, @@ -102,9 +125,9 @@ func DecodeLine(dec *lineprotocol.Decoder, ) error { // Reduce allocations in loop: t := time.Now() - metric, metricBuf := Metric{}, make([]byte, 0, 16) - selector := make([]string, 0, 4) - typeBuf, subTypeBuf := make([]byte, 0, 16), make([]byte, 0) + metric := Metric{} + st := decodeStatePool.Get().(*decodeState) + defer decodeStatePool.Put(st) // Optimize for the case where all lines in a "batch" are about the same // cluster and host. By using `WriteToLevel` (level = host), we do not need @@ -121,7 +144,7 @@ func DecodeLine(dec *lineprotocol.Decoder, // Needs to be copied because another call to dec.* would // invalidate the returned slice. - metricBuf = append(metricBuf[:0], rawmeasurement...) + st.metricBuf = append(st.metricBuf[:0], rawmeasurement...) // The go compiler optimizes map[string(byteslice)] lookups: metric.MetricConfig, ok = ms.Metrics[string(rawmeasurement)] @@ -129,7 +152,7 @@ func DecodeLine(dec *lineprotocol.Decoder, continue } - typeBuf, subTypeBuf := typeBuf[:0], subTypeBuf[:0] + st.typeBuf, st.subTypeBuf = st.typeBuf[:0], st.subTypeBuf[:0] cluster, host := clusterDefault, "" for { key, val, err := dec.NextTag() @@ -162,41 +185,49 @@ func DecodeLine(dec *lineprotocol.Decoder, } // We cannot be sure that the "type" tag comes before the "type-id" tag: - if len(typeBuf) == 0 { - typeBuf = append(typeBuf, val...) + if len(st.typeBuf) == 0 { + st.typeBuf = append(st.typeBuf, val...) } else { - typeBuf = reorder(typeBuf, val) + st.typeBuf = reorder(st.typeBuf, val) } case "type-id": - typeBuf = append(typeBuf, val...) + st.typeBuf = append(st.typeBuf, val...) case "subtype": // We cannot be sure that the "subtype" tag comes before the "stype-id" tag: - if len(subTypeBuf) == 0 { - subTypeBuf = append(subTypeBuf, val...) + if len(st.subTypeBuf) == 0 { + st.subTypeBuf = append(st.subTypeBuf, val...) } else { - subTypeBuf = reorder(subTypeBuf, val) - // subTypeBuf = reorder(typeBuf, val) + st.subTypeBuf = reorder(st.subTypeBuf, val) } case "stype-id": - subTypeBuf = append(subTypeBuf, val...) + st.subTypeBuf = append(st.subTypeBuf, val...) default: } } // If the cluster or host changed, the lvl was set to nil if lvl == nil { - selector = selector[:2] - selector[0], selector[1] = cluster, host - lvl = ms.GetLevel(selector) + st.selector = st.selector[:2] + st.selector[0], st.selector[1] = cluster, host + lvl = ms.GetLevel(st.selector) prevCluster, prevHost = cluster, host } - // subtypes: - selector = selector[:0] - if len(typeBuf) > 0 { - selector = append(selector, string(typeBuf)) // <- Allocation :( - if len(subTypeBuf) > 0 { - selector = append(selector, string(subTypeBuf)) + // subtypes: cache []byte→string conversions; messages in a batch typically + // share the same type/subtype so the hit rate is very high. + st.selector = st.selector[:0] + if len(st.typeBuf) > 0 { + if !bytes.Equal(st.typeBuf, st.prevTypeBytes) { + st.prevTypeBytes = append(st.prevTypeBytes[:0], st.typeBuf...) + st.prevTypeStr = string(st.typeBuf) + } + st.selector = append(st.selector, st.prevTypeStr) + if len(st.subTypeBuf) > 0 { + if !bytes.Equal(st.subTypeBuf, st.prevSubTypeBytes) { + st.prevSubTypeBytes = append(st.prevSubTypeBytes[:0], st.subTypeBuf...) + st.prevSubTypeStr = string(st.subTypeBuf) + } + st.selector = append(st.selector, st.prevSubTypeStr) } } @@ -244,7 +275,7 @@ func DecodeLine(dec *lineprotocol.Decoder, time := t.Unix() - if err := ms.WriteToLevel(lvl, selector, time, []Metric{metric}); err != nil { + if err := ms.WriteToLevel(lvl, st.selector, time, []Metric{metric}); err != nil { return err } }