Document line protocol. Optimize REST writeMetric path

This commit is contained in:
2026-02-27 12:30:27 +01:00
parent 4c3cd8e66a
commit a1db8263d7
2 changed files with 155 additions and 38 deletions

View File

@@ -10,7 +10,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
@@ -90,16 +89,17 @@ func freeMetrics(rw http.ResponseWriter, r *http.Request) {
// @security ApiKeyAuth
// @router /write/ [post]
func writeMetrics(rw http.ResponseWriter, r *http.Request) {
bytes, err := io.ReadAll(r.Body)
rw.Header().Add("Content-Type", "application/json")
if err != nil {
handleError(err, http.StatusInternalServerError, rw)
return
}
// Extract the "cluster" query parameter without allocating a url.Values map.
cluster := queryParam(r.URL.RawQuery, "cluster")
// Stream directly from the request body instead of copying it into a
// temporary buffer via io.ReadAll. The line-protocol decoder supports
// io.Reader natively, so this avoids the largest heap allocation.
ms := metricstore.GetMemoryStore()
dec := lineprotocol.NewDecoderWithBytes(bytes)
if err := metricstore.DecodeLine(dec, ms, r.URL.Query().Get("cluster")); err != nil {
dec := lineprotocol.NewDecoder(r.Body)
if err := metricstore.DecodeLine(dec, ms, cluster); err != nil {
cclog.Errorf("/api/write error: %s", err.Error())
handleError(err, http.StatusBadRequest, rw)
return
@@ -107,6 +107,20 @@ func writeMetrics(rw http.ResponseWriter, r *http.Request) {
rw.WriteHeader(http.StatusOK)
}
// queryParam extracts a single query-parameter value from a raw query string
// without allocating a url.Values map. Returns "" if the key is not present.
func queryParam(raw, key string) string {
for raw != "" {
var kv string
kv, raw, _ = strings.Cut(raw, "&")
k, v, _ := strings.Cut(kv, "=")
if k == key {
return v
}
}
return ""
}
// handleDebug godoc
// @summary Debug endpoint
// @tags debug

View File

@@ -3,9 +3,23 @@
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
// This file implements ingestion of InfluxDB line-protocol metric data received
// over NATS. Each line encodes one metric sample with the following structure:
//
// <measurement>[,cluster=<c>][,hostname=<h>][,type=<t>][,type-id=<id>][,subtype=<s>][,stype-id=<id>] value=<v> [<timestamp>]
//
// The measurement name identifies the metric (e.g. "cpu_load"). Tags provide
// routing information (cluster, host) and optional sub-device selectors (type,
// subtype). Only one field is expected per line: "value".
//
// After decoding, each sample is:
// 1. Written to the in-memory store via ms.WriteToLevel.
// 2. If the checkpoint format is "wal", also forwarded to the WAL staging
// goroutine via the WALMessages channel for durable write-ahead logging.
package metricstore
import (
"bytes"
"context"
"fmt"
"sync"
@@ -17,6 +31,16 @@ import (
"github.com/ClusterCockpit/cc-line-protocol/v2/lineprotocol"
)
// ReceiveNats subscribes to all configured NATS subjects and feeds incoming
// line-protocol messages into the MemoryStore.
//
// When workers > 1 a pool of goroutines drains a shared channel so that
// multiple messages can be decoded in parallel. With workers == 1 the NATS
// callback decodes inline (no channel overhead, lower latency).
//
// The function blocks until ctx is cancelled and all worker goroutines have
// finished. It returns nil when the NATS client is not configured; callers
// should treat that as a no-op rather than an error.
func ReceiveNats(ms *MemoryStore,
workers int,
ctx context.Context,
@@ -75,8 +99,13 @@ func ReceiveNats(ms *MemoryStore,
return nil
}
// Place `prefix` in front of `buf` but if possible,
// do that inplace in `buf`.
// reorder prepends prefix to buf in-place when buf has enough spare capacity,
// avoiding an allocation. Falls back to a regular append otherwise.
//
// It is used to assemble the "type<type-id>" and "subtype<stype-id>" selector
// strings when the type tag arrives before the type-id tag in the line, so the
// two byte slices need to be concatenated in tag-declaration order regardless
// of wire order.
func reorder(buf, prefix []byte) []byte {
n := len(prefix)
m := len(buf)
@@ -94,17 +123,83 @@ func reorder(buf, prefix []byte) []byte {
}
}
// Decode lines using dec and make write calls to the MemoryStore.
// If a line is missing its cluster tag, use clusterDefault as default.
// decodeState holds the per-call scratch buffers used by DecodeLine.
// Instances are recycled via decodeStatePool to avoid repeated allocations
// during high-throughput ingestion.
type decodeState struct {
// metricBuf holds a copy of the current measurement name (line-protocol
// measurement field). Copied because dec.Measurement() returns a slice
// that is invalidated by the next decoder call.
metricBuf []byte
// selector is the sub-device path passed to WriteToLevel and WALMessage
// (e.g. ["socket0"] or ["socket0", "memctrl1"]). Reused across lines.
selector []string
// typeBuf accumulates the concatenated "type"+"type-id" tag value for the
// current line. Reset at the start of each line's tag-decode loop.
typeBuf []byte
// subTypeBuf accumulates the concatenated "subtype"+"stype-id" tag value.
// Reset at the start of each line's tag-decode loop.
subTypeBuf []byte
// prevTypeBytes / prevTypeStr cache the last seen typeBuf content and its
// string conversion. Because consecutive lines in a batch typically address
// the same sub-device, the cache hit rate is very high and avoids
// repeated []byte→string allocations.
prevTypeBytes []byte
prevTypeStr string
// prevSubTypeBytes / prevSubTypeStr are the same cache for the subtype.
prevSubTypeBytes []byte
prevSubTypeStr string
}
// decodeStatePool recycles decodeState values across DecodeLine calls to
// reduce GC pressure during sustained metric ingestion.
var decodeStatePool = sync.Pool{
New: func() any {
return &decodeState{
metricBuf: make([]byte, 0, 16),
selector: make([]string, 0, 4),
typeBuf: make([]byte, 0, 16),
subTypeBuf: make([]byte, 0, 16),
}
},
}
// DecodeLine reads all lines from dec (InfluxDB line-protocol) and writes each
// decoded metric sample into ms.
//
// clusterDefault is used as the cluster name for lines that do not carry a
// "cluster" tag. Callers typically supply the ClusterTag value from the NATS
// subscription configuration.
//
// Performance notes:
// - A decodeState is obtained from decodeStatePool to reuse scratch buffers.
// - The Level pointer (host-level node in the metric tree) is cached across
// consecutive lines that share the same cluster+host pair to avoid
// repeated lock acquisitions on the root and cluster levels.
// - []byte→string conversions for type/subtype selectors are cached via
// prevType*/prevSubType* fields because batches typically repeat the same
// sub-device identifiers.
// - Timestamp parsing tries Second precision first; if that fails it retries
// Millisecond, Microsecond, and Nanosecond in turn. A missing timestamp
// falls back to time.Now().
//
// When the checkpoint format is "wal" each successfully decoded sample is also
// sent to WALMessages so the WAL staging goroutine can persist it durably
// before the next binary snapshot.
func DecodeLine(dec *lineprotocol.Decoder,
ms *MemoryStore,
clusterDefault string,
) error {
// Reduce allocations in loop:
t := time.Now()
metric, metricBuf := Metric{}, make([]byte, 0, 16)
selector := make([]string, 0, 4)
typeBuf, subTypeBuf := make([]byte, 0, 16), make([]byte, 0)
metric := Metric{}
st := decodeStatePool.Get().(*decodeState)
defer decodeStatePool.Put(st)
// Optimize for the case where all lines in a "batch" are about the same
// cluster and host. By using `WriteToLevel` (level = host), we do not need
@@ -121,7 +216,7 @@ func DecodeLine(dec *lineprotocol.Decoder,
// Needs to be copied because another call to dec.* would
// invalidate the returned slice.
metricBuf = append(metricBuf[:0], rawmeasurement...)
st.metricBuf = append(st.metricBuf[:0], rawmeasurement...)
// The go compiler optimizes map[string(byteslice)] lookups:
metric.MetricConfig, ok = ms.Metrics[string(rawmeasurement)]
@@ -129,7 +224,7 @@ func DecodeLine(dec *lineprotocol.Decoder,
continue
}
typeBuf, subTypeBuf := typeBuf[:0], subTypeBuf[:0]
st.typeBuf, st.subTypeBuf = st.typeBuf[:0], st.subTypeBuf[:0]
cluster, host := clusterDefault, ""
for {
key, val, err := dec.NextTag()
@@ -162,41 +257,49 @@ func DecodeLine(dec *lineprotocol.Decoder,
}
// We cannot be sure that the "type" tag comes before the "type-id" tag:
if len(typeBuf) == 0 {
typeBuf = append(typeBuf, val...)
if len(st.typeBuf) == 0 {
st.typeBuf = append(st.typeBuf, val...)
} else {
typeBuf = reorder(typeBuf, val)
st.typeBuf = reorder(st.typeBuf, val)
}
case "type-id":
typeBuf = append(typeBuf, val...)
st.typeBuf = append(st.typeBuf, val...)
case "subtype":
// We cannot be sure that the "subtype" tag comes before the "stype-id" tag:
if len(subTypeBuf) == 0 {
subTypeBuf = append(subTypeBuf, val...)
if len(st.subTypeBuf) == 0 {
st.subTypeBuf = append(st.subTypeBuf, val...)
} else {
subTypeBuf = reorder(subTypeBuf, val)
// subTypeBuf = reorder(typeBuf, val)
st.subTypeBuf = reorder(st.subTypeBuf, val)
}
case "stype-id":
subTypeBuf = append(subTypeBuf, val...)
st.subTypeBuf = append(st.subTypeBuf, val...)
default:
}
}
// If the cluster or host changed, the lvl was set to nil
if lvl == nil {
selector = selector[:2]
selector[0], selector[1] = cluster, host
lvl = ms.GetLevel(selector)
st.selector = st.selector[:2]
st.selector[0], st.selector[1] = cluster, host
lvl = ms.GetLevel(st.selector)
prevCluster, prevHost = cluster, host
}
// subtypes:
selector = selector[:0]
if len(typeBuf) > 0 {
selector = append(selector, string(typeBuf)) // <- Allocation :(
if len(subTypeBuf) > 0 {
selector = append(selector, string(subTypeBuf))
// subtypes: cache []byte→string conversions; messages in a batch typically
// share the same type/subtype so the hit rate is very high.
st.selector = st.selector[:0]
if len(st.typeBuf) > 0 {
if !bytes.Equal(st.typeBuf, st.prevTypeBytes) {
st.prevTypeBytes = append(st.prevTypeBytes[:0], st.typeBuf...)
st.prevTypeStr = string(st.typeBuf)
}
st.selector = append(st.selector, st.prevTypeStr)
if len(st.subTypeBuf) > 0 {
if !bytes.Equal(st.subTypeBuf, st.prevSubTypeBytes) {
st.prevSubTypeBytes = append(st.prevSubTypeBytes[:0], st.subTypeBuf...)
st.prevSubTypeStr = string(st.subTypeBuf)
}
st.selector = append(st.selector, st.prevSubTypeStr)
}
}
@@ -246,16 +349,16 @@ func DecodeLine(dec *lineprotocol.Decoder,
if Keys.Checkpoints.FileFormat == "wal" {
WALMessages <- &WALMessage{
MetricName: string(metricBuf),
MetricName: string(st.metricBuf),
Cluster: cluster,
Node: host,
Selector: append([]string{}, selector...),
Selector: append([]string{}, st.selector...),
Value: metric.Value,
Timestamp: time,
}
}
if err := ms.WriteToLevel(lvl, selector, time, []Metric{metric}); err != nil {
if err := ms.WriteToLevel(lvl, st.selector, time, []Metric{metric}); err != nil {
return err
}
}