Fix parsing of metric subtypes (key stype)

This commit is contained in:
Thomas Roehl
2026-05-04 18:10:01 +02:00
parent 196f659a50
commit c76219651e
2 changed files with 11 additions and 11 deletions

View File

@@ -78,7 +78,7 @@ type APIQueryResponse struct {
// - Type + TypeIds: First level of hierarchy (e.g., "cpu" + ["0", "1", "2"]) // - Type + TypeIds: First level of hierarchy (e.g., "cpu" + ["0", "1", "2"])
// - SubType + SubTypeIds: Second level of hierarchy (e.g., "core" + ["0", "1"]) // - SubType + SubTypeIds: Second level of hierarchy (e.g., "core" + ["0", "1"])
// //
// If Aggregate is true, data from multiple type/subtype IDs will be aggregated according // If Aggregate is true, data from multiple type/stype IDs will be aggregated according
// to the metric's aggregation strategy. Otherwise, separate results are returned for each combination. // to the metric's aggregation strategy. Otherwise, separate results are returned for each combination.
type APIQuery struct { type APIQuery struct {
Type *string `json:"type,omitempty"` Type *string `json:"type,omitempty"`
@@ -174,13 +174,13 @@ func (data *APIMetricData) PadDataWithNull(ms *MemoryStore, from, to int64, metr
// This is the primary API for retrieving metric data from the memory store. It supports: // This is the primary API for retrieving metric data from the memory store. It supports:
// - Individual queries via req.Queries // - Individual queries via req.Queries
// - Batch queries for all nodes via req.ForAllNodes // - Batch queries for all nodes via req.ForAllNodes
// - Hierarchical selector construction (cluster → host → type → subtype) // - Hierarchical selector construction (cluster → host → type → stype)
// - Optional statistics computation (avg, min, max) // - Optional statistics computation (avg, min, max)
// - Optional data scaling // - Optional data scaling
// - Optional data padding with NaN values // - Optional data padding with NaN values
// //
// The function constructs selectors based on the query parameters and calls MemoryStore.Read() // The function constructs selectors based on the query parameters and calls MemoryStore.Read()
// for each selector. If a query specifies Aggregate=false with multiple type/subtype IDs, // for each selector. If a query specifies Aggregate=false with multiple type/stype IDs,
// separate results are returned for each combination. // separate results are returned for each combination.
// //
// Parameters: // Parameters:

View File

@@ -6,11 +6,11 @@
// This file implements ingestion of InfluxDB line-protocol metric data received // This file implements ingestion of InfluxDB line-protocol metric data received
// over NATS. Each line encodes one metric sample with the following structure: // over NATS. Each line encodes one metric sample with the following structure:
// //
// <measurement>[,cluster=<c>][,hostname=<h>][,type=<t>][,type-id=<id>][,subtype=<s>][,stype-id=<id>] value=<v> [<timestamp>] // <measurement>[,cluster=<c>][,hostname=<h>][,type=<t>][,type-id=<id>][,stype=<s>][,stype-id=<id>] value=<v> [<timestamp>]
// //
// The measurement name identifies the metric (e.g. "cpu_load"). Tags provide // The measurement name identifies the metric (e.g. "cpu_load"). Tags provide
// routing information (cluster, host) and optional sub-device selectors (type, // routing information (cluster, host) and optional sub-device selectors (type,
// subtype). Only one field is expected per line: "value". // stype). Only one field is expected per line: "value".
// //
// After decoding, each sample is: // After decoding, each sample is:
// 1. Written to the in-memory store via ms.WriteToLevel. // 1. Written to the in-memory store via ms.WriteToLevel.
@@ -103,7 +103,7 @@ func ReceiveNats(ms *MemoryStore,
// reorder prepends prefix to buf in-place when buf has enough spare capacity, // reorder prepends prefix to buf in-place when buf has enough spare capacity,
// avoiding an allocation. Falls back to a regular append otherwise. // avoiding an allocation. Falls back to a regular append otherwise.
// //
// It is used to assemble the "type<type-id>" and "subtype<stype-id>" selector // It is used to assemble the "type<type-id>" and "stype<stype-id>" selector
// strings when the type tag arrives before the type-id tag in the line, so the // strings when the type tag arrives before the type-id tag in the line, so the
// two byte slices need to be concatenated in tag-declaration order regardless // two byte slices need to be concatenated in tag-declaration order regardless
// of wire order. // of wire order.
@@ -145,7 +145,7 @@ type decodeState struct {
// current line. Reset at the start of each line's tag-decode loop. // current line. Reset at the start of each line's tag-decode loop.
typeBuf []byte typeBuf []byte
// subTypeBuf accumulates the concatenated "subtype"+"stype-id" tag value. // subTypeBuf accumulates the concatenated "stype"+"stype-id" tag value.
// Reset at the start of each line's tag-decode loop. // Reset at the start of each line's tag-decode loop.
subTypeBuf []byte subTypeBuf []byte
@@ -186,7 +186,7 @@ var decodeStatePool = sync.Pool{
// - The Level pointer (host-level node in the metric tree) is cached across // - The Level pointer (host-level node in the metric tree) is cached across
// consecutive lines that share the same cluster+host pair to avoid // consecutive lines that share the same cluster+host pair to avoid
// repeated lock acquisitions on the root and cluster levels. // repeated lock acquisitions on the root and cluster levels.
// - []byte→string conversions for type/subtype selectors are cached via // - []byte→string conversions for type/stype selectors are cached via
// prevType*/prevSubType* fields because batches typically repeat the same // prevType*/prevSubType* fields because batches typically repeat the same
// sub-device identifiers. // sub-device identifiers.
// - Timestamp parsing tries Second precision first; if that fails it retries // - Timestamp parsing tries Second precision first; if that fails it retries
@@ -269,8 +269,8 @@ func DecodeLine(dec *lineprotocol.Decoder,
} }
case "type-id": case "type-id":
st.typeBuf = append(st.typeBuf, val...) st.typeBuf = append(st.typeBuf, val...)
case "subtype": case "stype":
// We cannot be sure that the "subtype" tag comes before the "stype-id" tag: // We cannot be sure that the "stype" tag comes before the "stype-id" tag:
if len(st.subTypeBuf) == 0 { if len(st.subTypeBuf) == 0 {
st.subTypeBuf = append(st.subTypeBuf, val...) st.subTypeBuf = append(st.subTypeBuf, val...)
} else { } else {
@@ -291,7 +291,7 @@ func DecodeLine(dec *lineprotocol.Decoder,
} }
// subtypes: cache []byte→string conversions; messages in a batch typically // subtypes: cache []byte→string conversions; messages in a batch typically
// share the same type/subtype so the hit rate is very high. // share the same type/stype so the hit rate is very high.
st.selector = st.selector[:0] st.selector = st.selector[:0]
if len(st.typeBuf) > 0 { if len(st.typeBuf) > 0 {
if !bytes.Equal(st.typeBuf, st.prevTypeBytes) { if !bytes.Equal(st.typeBuf, st.prevTypeBytes) {