From a1db8263d72b9727347ea69e0cc832ec67bd1235 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 27 Feb 2026 12:30:27 +0100 Subject: [PATCH] Document line protocol. Optimize REST writeMetric path --- internal/api/metricstore.go | 30 ++++-- pkg/metricstore/lineprotocol.go | 163 ++++++++++++++++++++++++++------ 2 files changed, 155 insertions(+), 38 deletions(-) diff --git a/internal/api/metricstore.go b/internal/api/metricstore.go index ff4deb6a..325b26ba 100644 --- a/internal/api/metricstore.go +++ b/internal/api/metricstore.go @@ -10,7 +10,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "net/http" "strconv" "strings" @@ -90,16 +89,17 @@ func freeMetrics(rw http.ResponseWriter, r *http.Request) { // @security ApiKeyAuth // @router /write/ [post] func writeMetrics(rw http.ResponseWriter, r *http.Request) { - bytes, err := io.ReadAll(r.Body) rw.Header().Add("Content-Type", "application/json") - if err != nil { - handleError(err, http.StatusInternalServerError, rw) - return - } + // Extract the "cluster" query parameter without allocating a url.Values map. + cluster := queryParam(r.URL.RawQuery, "cluster") + + // Stream directly from the request body instead of copying it into a + // temporary buffer via io.ReadAll. The line-protocol decoder supports + // io.Reader natively, so this avoids the largest heap allocation. ms := metricstore.GetMemoryStore() - dec := lineprotocol.NewDecoderWithBytes(bytes) - if err := metricstore.DecodeLine(dec, ms, r.URL.Query().Get("cluster")); err != nil { + dec := lineprotocol.NewDecoder(r.Body) + if err := metricstore.DecodeLine(dec, ms, cluster); err != nil { cclog.Errorf("/api/write error: %s", err.Error()) handleError(err, http.StatusBadRequest, rw) return @@ -107,6 +107,20 @@ func writeMetrics(rw http.ResponseWriter, r *http.Request) { rw.WriteHeader(http.StatusOK) } +// queryParam extracts a single query-parameter value from a raw query string +// without allocating a url.Values map. Returns "" if the key is not present. +func queryParam(raw, key string) string { + for raw != "" { + var kv string + kv, raw, _ = strings.Cut(raw, "&") + k, v, _ := strings.Cut(kv, "=") + if k == key { + return v + } + } + return "" +} + // handleDebug godoc // @summary Debug endpoint // @tags debug diff --git a/pkg/metricstore/lineprotocol.go b/pkg/metricstore/lineprotocol.go index f8c83e31..ecae3df1 100644 --- a/pkg/metricstore/lineprotocol.go +++ b/pkg/metricstore/lineprotocol.go @@ -3,9 +3,23 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. +// This file implements ingestion of InfluxDB line-protocol metric data received +// over NATS. Each line encodes one metric sample with the following structure: +// +// [,cluster=][,hostname=][,type=][,type-id=][,subtype=][,stype-id=] value= [] +// +// The measurement name identifies the metric (e.g. "cpu_load"). Tags provide +// routing information (cluster, host) and optional sub-device selectors (type, +// subtype). Only one field is expected per line: "value". +// +// After decoding, each sample is: +// 1. Written to the in-memory store via ms.WriteToLevel. +// 2. If the checkpoint format is "wal", also forwarded to the WAL staging +// goroutine via the WALMessages channel for durable write-ahead logging. package metricstore import ( + "bytes" "context" "fmt" "sync" @@ -17,6 +31,16 @@ import ( "github.com/ClusterCockpit/cc-line-protocol/v2/lineprotocol" ) +// ReceiveNats subscribes to all configured NATS subjects and feeds incoming +// line-protocol messages into the MemoryStore. +// +// When workers > 1 a pool of goroutines drains a shared channel so that +// multiple messages can be decoded in parallel. With workers == 1 the NATS +// callback decodes inline (no channel overhead, lower latency). +// +// The function blocks until ctx is cancelled and all worker goroutines have +// finished. It returns nil when the NATS client is not configured; callers +// should treat that as a no-op rather than an error. func ReceiveNats(ms *MemoryStore, workers int, ctx context.Context, @@ -75,8 +99,13 @@ func ReceiveNats(ms *MemoryStore, return nil } -// Place `prefix` in front of `buf` but if possible, -// do that inplace in `buf`. +// reorder prepends prefix to buf in-place when buf has enough spare capacity, +// avoiding an allocation. Falls back to a regular append otherwise. +// +// It is used to assemble the "type" and "subtype" selector +// strings when the type tag arrives before the type-id tag in the line, so the +// two byte slices need to be concatenated in tag-declaration order regardless +// of wire order. func reorder(buf, prefix []byte) []byte { n := len(prefix) m := len(buf) @@ -94,17 +123,83 @@ func reorder(buf, prefix []byte) []byte { } } -// Decode lines using dec and make write calls to the MemoryStore. -// If a line is missing its cluster tag, use clusterDefault as default. +// decodeState holds the per-call scratch buffers used by DecodeLine. +// Instances are recycled via decodeStatePool to avoid repeated allocations +// during high-throughput ingestion. +type decodeState struct { + // metricBuf holds a copy of the current measurement name (line-protocol + // measurement field). Copied because dec.Measurement() returns a slice + // that is invalidated by the next decoder call. + metricBuf []byte + + // selector is the sub-device path passed to WriteToLevel and WALMessage + // (e.g. ["socket0"] or ["socket0", "memctrl1"]). Reused across lines. + selector []string + + // typeBuf accumulates the concatenated "type"+"type-id" tag value for the + // current line. Reset at the start of each line's tag-decode loop. + typeBuf []byte + + // subTypeBuf accumulates the concatenated "subtype"+"stype-id" tag value. + // Reset at the start of each line's tag-decode loop. + subTypeBuf []byte + + // prevTypeBytes / prevTypeStr cache the last seen typeBuf content and its + // string conversion. Because consecutive lines in a batch typically address + // the same sub-device, the cache hit rate is very high and avoids + // repeated []byte→string allocations. + prevTypeBytes []byte + prevTypeStr string + + // prevSubTypeBytes / prevSubTypeStr are the same cache for the subtype. + prevSubTypeBytes []byte + prevSubTypeStr string +} + +// decodeStatePool recycles decodeState values across DecodeLine calls to +// reduce GC pressure during sustained metric ingestion. +var decodeStatePool = sync.Pool{ + New: func() any { + return &decodeState{ + metricBuf: make([]byte, 0, 16), + selector: make([]string, 0, 4), + typeBuf: make([]byte, 0, 16), + subTypeBuf: make([]byte, 0, 16), + } + }, +} + +// DecodeLine reads all lines from dec (InfluxDB line-protocol) and writes each +// decoded metric sample into ms. +// +// clusterDefault is used as the cluster name for lines that do not carry a +// "cluster" tag. Callers typically supply the ClusterTag value from the NATS +// subscription configuration. +// +// Performance notes: +// - A decodeState is obtained from decodeStatePool to reuse scratch buffers. +// - The Level pointer (host-level node in the metric tree) is cached across +// consecutive lines that share the same cluster+host pair to avoid +// repeated lock acquisitions on the root and cluster levels. +// - []byte→string conversions for type/subtype selectors are cached via +// prevType*/prevSubType* fields because batches typically repeat the same +// sub-device identifiers. +// - Timestamp parsing tries Second precision first; if that fails it retries +// Millisecond, Microsecond, and Nanosecond in turn. A missing timestamp +// falls back to time.Now(). +// +// When the checkpoint format is "wal" each successfully decoded sample is also +// sent to WALMessages so the WAL staging goroutine can persist it durably +// before the next binary snapshot. func DecodeLine(dec *lineprotocol.Decoder, ms *MemoryStore, clusterDefault string, ) error { // Reduce allocations in loop: t := time.Now() - metric, metricBuf := Metric{}, make([]byte, 0, 16) - selector := make([]string, 0, 4) - typeBuf, subTypeBuf := make([]byte, 0, 16), make([]byte, 0) + metric := Metric{} + st := decodeStatePool.Get().(*decodeState) + defer decodeStatePool.Put(st) // Optimize for the case where all lines in a "batch" are about the same // cluster and host. By using `WriteToLevel` (level = host), we do not need @@ -121,7 +216,7 @@ func DecodeLine(dec *lineprotocol.Decoder, // Needs to be copied because another call to dec.* would // invalidate the returned slice. - metricBuf = append(metricBuf[:0], rawmeasurement...) + st.metricBuf = append(st.metricBuf[:0], rawmeasurement...) // The go compiler optimizes map[string(byteslice)] lookups: metric.MetricConfig, ok = ms.Metrics[string(rawmeasurement)] @@ -129,7 +224,7 @@ func DecodeLine(dec *lineprotocol.Decoder, continue } - typeBuf, subTypeBuf := typeBuf[:0], subTypeBuf[:0] + st.typeBuf, st.subTypeBuf = st.typeBuf[:0], st.subTypeBuf[:0] cluster, host := clusterDefault, "" for { key, val, err := dec.NextTag() @@ -162,41 +257,49 @@ func DecodeLine(dec *lineprotocol.Decoder, } // We cannot be sure that the "type" tag comes before the "type-id" tag: - if len(typeBuf) == 0 { - typeBuf = append(typeBuf, val...) + if len(st.typeBuf) == 0 { + st.typeBuf = append(st.typeBuf, val...) } else { - typeBuf = reorder(typeBuf, val) + st.typeBuf = reorder(st.typeBuf, val) } case "type-id": - typeBuf = append(typeBuf, val...) + st.typeBuf = append(st.typeBuf, val...) case "subtype": // We cannot be sure that the "subtype" tag comes before the "stype-id" tag: - if len(subTypeBuf) == 0 { - subTypeBuf = append(subTypeBuf, val...) + if len(st.subTypeBuf) == 0 { + st.subTypeBuf = append(st.subTypeBuf, val...) } else { - subTypeBuf = reorder(subTypeBuf, val) - // subTypeBuf = reorder(typeBuf, val) + st.subTypeBuf = reorder(st.subTypeBuf, val) } case "stype-id": - subTypeBuf = append(subTypeBuf, val...) + st.subTypeBuf = append(st.subTypeBuf, val...) default: } } // If the cluster or host changed, the lvl was set to nil if lvl == nil { - selector = selector[:2] - selector[0], selector[1] = cluster, host - lvl = ms.GetLevel(selector) + st.selector = st.selector[:2] + st.selector[0], st.selector[1] = cluster, host + lvl = ms.GetLevel(st.selector) prevCluster, prevHost = cluster, host } - // subtypes: - selector = selector[:0] - if len(typeBuf) > 0 { - selector = append(selector, string(typeBuf)) // <- Allocation :( - if len(subTypeBuf) > 0 { - selector = append(selector, string(subTypeBuf)) + // subtypes: cache []byte→string conversions; messages in a batch typically + // share the same type/subtype so the hit rate is very high. + st.selector = st.selector[:0] + if len(st.typeBuf) > 0 { + if !bytes.Equal(st.typeBuf, st.prevTypeBytes) { + st.prevTypeBytes = append(st.prevTypeBytes[:0], st.typeBuf...) + st.prevTypeStr = string(st.typeBuf) + } + st.selector = append(st.selector, st.prevTypeStr) + if len(st.subTypeBuf) > 0 { + if !bytes.Equal(st.subTypeBuf, st.prevSubTypeBytes) { + st.prevSubTypeBytes = append(st.prevSubTypeBytes[:0], st.subTypeBuf...) + st.prevSubTypeStr = string(st.subTypeBuf) + } + st.selector = append(st.selector, st.prevSubTypeStr) } } @@ -246,16 +349,16 @@ func DecodeLine(dec *lineprotocol.Decoder, if Keys.Checkpoints.FileFormat == "wal" { WALMessages <- &WALMessage{ - MetricName: string(metricBuf), + MetricName: string(st.metricBuf), Cluster: cluster, Node: host, - Selector: append([]string{}, selector...), + Selector: append([]string{}, st.selector...), Value: metric.Value, Timestamp: time, } } - if err := ms.WriteToLevel(lvl, selector, time, []Metric{metric}); err != nil { + if err := ms.WriteToLevel(lvl, st.selector, time, []Metric{metric}); err != nil { return err } }