diff --git a/lineprotocol.go b/lineprotocol.go index 5e84da0..d2ecb51 100644 --- a/lineprotocol.go +++ b/lineprotocol.go @@ -50,6 +50,7 @@ func (f *Float) UnmarshalJSON(input []byte) error { type Metric struct { Name string + minfo metricInfo Value Float } @@ -116,6 +117,25 @@ func ReceiveNats(address string, handleLine func(dec *lineprotocol.Decoder) erro return nil } +// Place `prefix` in front of `buf` but if possible, +// do that inplace in `buf`. +func reorder(buf, prefix []byte) []byte { + n := len(prefix) + m := len(buf) + if cap(buf) < m+n { + return append(prefix[:n:n], buf...) + } else { + buf = buf[:n+m] + for i := m - 1; i >= 0; i-- { + buf[i+n] = buf[i] + } + for i := 0; i < n; i++ { + buf[i] = prefix[i] + } + return buf + } +} + func decodeLine(dec *lineprotocol.Decoder) error { // Reduce allocations in loop: t := time.Now() @@ -126,17 +146,31 @@ func decodeLine(dec *lineprotocol.Decoder) error { // Optimize for the case where all lines in a "batch" are about the same // cluster and host. By using `WriteToLevel` (level = host), we do not need // to take the root- and cluster-level lock as often. - var hostLevel *level = nil + var lvl *level = nil var prevCluster, prevHost string = "", "" for dec.Next() { + metrics = metrics[:0] rawmeasurement, err := dec.Measurement() if err != nil { return err } + // A more dense lp format if supported if the measurement is 'data'. + // In that case, the field keys are used as metric names. + if string(rawmeasurement) != "data" { + minfo, ok := memoryStore.metrics[string(rawmeasurement)] + if !ok { + continue + } + + metrics = append(metrics, Metric{ + minfo: minfo, + }) + } + + typeBuf, subTypeBuf := typeBuf[:0], subTypeBuf[:0] var cluster, host string - var typeName, typeId, subType, subTypeId []byte for { key, val, err := dec.NextTag() if err != nil { @@ -153,54 +187,57 @@ func decodeLine(dec *lineprotocol.Decoder) error { cluster = prevCluster } else { cluster = string(val) + lvl = nil } - case "hostname": + case "hostname", "host": if string(val) == prevHost { host = prevHost } else { host = string(val) + lvl = nil } case "type": - typeName = val + if string(val) == "node" { + break + } + + if len(typeBuf) == 0 { + typeBuf = append(typeBuf, val...) + } else { + typeBuf = reorder(typeBuf, val) + } case "type-id": - typeId = val + typeBuf = append(typeBuf, val...) case "subtype": - subType = val + if len(subTypeBuf) == 0 { + subTypeBuf = append(subTypeBuf, val...) + } else { + subTypeBuf = reorder(typeBuf, val) + } case "stype-id": - subTypeId = val + subTypeBuf = append(subTypeBuf, val...) default: // Ignore unkown tags (cc-metric-collector might send us a unit for example that we do not need) // return fmt.Errorf("unkown tag: '%s' (value: '%s')", string(key), string(val)) } } - if hostLevel == nil || prevCluster != cluster || prevHost != host { - prevCluster = cluster - prevHost = host + if lvl == nil { selector = selector[:2] - selector[0] = cluster - selector[1] = host - hostLevel = memoryStore.root.findLevelOrCreate(selector, len(memoryStore.metrics)) + selector[0], selector[1] = cluster, host + lvl = memoryStore.GetLevel(selector) + prevCluster, prevHost = cluster, host } selector = selector[:0] - if len(typeId) > 0 { - typeBuf = typeBuf[:0] - typeBuf = append(typeBuf, typeName...) - typeBuf = append(typeBuf, typeId...) + if len(typeBuf) > 0 { selector = append(selector, string(typeBuf)) // <- Allocation :( - if len(subTypeId) > 0 { - subTypeBuf = subTypeBuf[:0] - subTypeBuf = append(subTypeBuf, subType...) - subTypeBuf = append(subTypeBuf, subTypeId...) + if len(subTypeBuf) > 0 { selector = append(selector, string(subTypeBuf)) } } - metrics = metrics[:0] - // A more dense lp format if supported if the measurement is 'data'. - // In that case, the field keys are used as metric names. - if string(rawmeasurement) == "data" { + if len(metrics) == 0 { for { key, val, err := dec.NextField() if err != nil { @@ -220,13 +257,17 @@ func decodeLine(dec *lineprotocol.Decoder) error { return fmt.Errorf("unsupported value type in message: %s", val.Kind().String()) } + minfo, ok := memoryStore.metrics[string(key)] + if !ok { + continue + } + metrics = append(metrics, Metric{ - Name: string(key), // <- Allocation :( + minfo: minfo, Value: value, }) } } else { - measurement := string(rawmeasurement) // <- Allocation :( var value Float for { key, val, err := dec.NextField() @@ -251,10 +292,7 @@ func decodeLine(dec *lineprotocol.Decoder) error { } } - metrics = append(metrics, Metric{ - Name: measurement, - Value: value, - }) + metrics[0].Value = value } t, err = dec.Time(lineprotocol.Second, t) @@ -263,7 +301,7 @@ func decodeLine(dec *lineprotocol.Decoder) error { } // log.Printf("write: %s (%v) -> %v\n", string(measurement), selector, value) - if err := memoryStore.WriteToLevel(hostLevel, selector, t.Unix(), metrics); err != nil { + if err := memoryStore.WriteToLevel(lvl, selector, t.Unix(), metrics); err != nil { return err } } diff --git a/lineprotocol_test.go b/lineprotocol_test.go new file mode 100644 index 0000000..83efe84 --- /dev/null +++ b/lineprotocol_test.go @@ -0,0 +1,58 @@ +package main + +import ( + "log" + "testing" + + "github.com/influxdata/line-protocol/v2/lineprotocol" +) + +const TestDataClassicFormat string = ` +m1,cluster=ctest,hostname=htest1,type=node value=1 123456789 +m2,cluster=ctest,hostname=htest1,type=node value=2 123456789 +m3,cluster=ctest,hostname=htest2,type=node value=3 123456789 +m4,cluster=ctest,hostname=htest2,type=core,type-id=1 value=4 123456789 +m4,cluster=ctest,hostname=htest2,type-id=2,type=core value=5 123456789 +` + +func TestLineprotocolDecoder(t *testing.T) { + prevMemoryStore := memoryStore + t.Cleanup(func() { + memoryStore = prevMemoryStore + }) + + memoryStore = NewMemoryStore(map[string]MetricConfig{ + "m1": {Frequency: 1}, + "m2": {Frequency: 1}, + "m3": {Frequency: 1}, + "m4": {Frequency: 1}, + }) + + dec := lineprotocol.NewDecoderWithBytes([]byte(TestDataClassicFormat)) + if err := decodeLine(dec); err != nil { + log.Fatal(err) + } + + // memoryStore.DebugDump(bufio.NewWriter(os.Stderr)) + + h1 := memoryStore.GetLevel([]string{"ctest", "htest1"}) + h1b1 := h1.metrics[memoryStore.metrics["m1"].offset] + h1b2 := h1.metrics[memoryStore.metrics["m2"].offset] + if h1b1.data[0] != 1.0 || h1b2.data[0] != 2.0 { + log.Fatal() + } + + h2 := memoryStore.GetLevel([]string{"ctest", "htest2"}) + h2b3 := h2.metrics[memoryStore.metrics["m3"].offset] + if h2b3.data[0] != 3.0 { + log.Fatal() + } + + h2c1 := memoryStore.GetLevel([]string{"ctest", "htest2", "core1"}) + h2c1b4 := h2c1.metrics[memoryStore.metrics["m4"].offset] + h2c2 := memoryStore.GetLevel([]string{"ctest", "htest2", "core2"}) + h2c2b4 := h2c2.metrics[memoryStore.metrics["m4"].offset] + if h2c1b4.data[0] != 4.0 || h2c2b4.data[0] != 5.0 { + log.Fatal() + } +} diff --git a/memstore.go b/memstore.go index 339d195..9a909f5 100644 --- a/memstore.go +++ b/memstore.go @@ -313,23 +313,21 @@ const ( AvgAggregation ) +type metricInfo struct { + offset int + aggregation AggregationStrategy + frequency int64 +} + type MemoryStore struct { root level // root of the tree structure - metrics map[string]struct { - offset int - aggregation AggregationStrategy - frequency int64 - } + metrics map[string]metricInfo } // Return a new, initialized instance of a MemoryStore. // Will panic if values in the metric configurations are invalid. func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore { - ms := make(map[string]struct { - offset int - aggregation AggregationStrategy - frequency int64 - }) + ms := make(map[string]metricInfo) offset := 0 for key, config := range metrics { @@ -342,11 +340,11 @@ func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore { panic("invalid aggregation strategy: " + config.Aggregation) } - ms[key] = struct { - offset int - aggregation AggregationStrategy - frequency int64 - }{ + if config.Frequency == 0 { + panic("invalid frequency") + } + + ms[key] = metricInfo{ offset: offset, aggregation: aggregation, frequency: config.Frequency, @@ -367,26 +365,40 @@ func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore { // Write all values in `metrics` to the level specified by `selector` for time `ts`. // Look at `findLevelOrCreate` for how selectors work. func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error { + var ok bool + for i, metric := range metrics { + if metric.minfo.frequency == 0 { + metric.minfo, ok = m.metrics[metric.Name] + if !ok { + metric.minfo.frequency = 0 + } + metrics[i] = metric + } + } + return m.WriteToLevel(&m.root, selector, ts, metrics) } +func (m *MemoryStore) GetLevel(selector []string) *level { + return m.root.findLevelOrCreate(selector, len(m.metrics)) +} + +// Assumes that `minfo` in `metrics` is filled in! func (m *MemoryStore) WriteToLevel(l *level, selector []string, ts int64, metrics []Metric) error { l = l.findLevelOrCreate(selector, len(m.metrics)) l.lock.Lock() defer l.lock.Unlock() for _, metric := range metrics { - minfo, ok := m.metrics[metric.Name] - if !ok { - // return errors.New("Unknown metric: " + metric.Name) + if metric.minfo.frequency == 0 { continue } - b := l.metrics[minfo.offset] + b := l.metrics[metric.minfo.offset] if b == nil { // First write to this metric and level - b = newBuffer(ts, minfo.frequency) - l.metrics[minfo.offset] = b + b = newBuffer(ts, metric.minfo.frequency) + l.metrics[metric.minfo.offset] = b } nb, err := b.write(ts, metric.Value) @@ -396,7 +408,7 @@ func (m *MemoryStore) WriteToLevel(l *level, selector []string, ts int64, metric // Last write created a new buffer... if b != nb { - l.metrics[minfo.offset] = nb + l.metrics[metric.minfo.offset] = nb } } return nil