reduce allocations and locking

This commit is contained in:
Lou Knauer 2022-01-21 10:47:40 +01:00
parent 76c58d7799
commit bf7c33513b
2 changed files with 73 additions and 27 deletions

View File

@ -117,13 +117,26 @@ func ReceiveNats(address string, handleLine func(dec *lineprotocol.Decoder) erro
} }
func decodeLine(dec *lineprotocol.Decoder) error { func decodeLine(dec *lineprotocol.Decoder) error {
// Reduce allocations in loop:
t := time.Now()
metrics := make([]Metric, 0, 10)
selector := make([]string, 0, 4)
typeBuf, subTypeBuf := make([]byte, 0, 20), make([]byte, 0)
// Optimize for the case where all lines in a "batch" are about the same
// cluster and host. By using `WriteToLevel` (level = host), we do not need
// to take the root- and cluster-level lock as often.
var hostLevel *level = nil
var prevCluster, prevHost string = "", ""
for dec.Next() { for dec.Next() {
rawmeasurement, err := dec.Measurement() rawmeasurement, err := dec.Measurement()
if err != nil { if err != nil {
return err return err
} }
var cluster, host, typeName, typeId, subType, subTypeId string var cluster, host string
var typeName, typeId, subType, subTypeId []byte
for { for {
key, val, err := dec.NextTag() key, val, err := dec.NextTag()
if err != nil { if err != nil {
@ -133,38 +146,61 @@ func decodeLine(dec *lineprotocol.Decoder) error {
break break
} }
// The go compiler optimizes string([]byte{...}) == "...":
switch string(key) { switch string(key) {
case "cluster": case "cluster":
if string(val) == prevCluster {
cluster = prevCluster
} else {
cluster = string(val) cluster = string(val)
}
case "hostname": case "hostname":
if string(val) == prevHost {
host = prevHost
} else {
host = string(val) host = string(val)
}
case "type": case "type":
typeName = string(val) typeName = val
case "type-id": case "type-id":
typeId = string(val) typeId = val
case "subtype": case "subtype":
subType = string(val) subType = val
case "stype-id": case "stype-id":
subTypeId = string(val) subTypeId = val
default: default:
// Ignore unkown tags (cc-metric-collector might send us a unit for example that we do not need) // Ignore unkown tags (cc-metric-collector might send us a unit for example that we do not need)
// return fmt.Errorf("unkown tag: '%s' (value: '%s')", string(key), string(val)) // return fmt.Errorf("unkown tag: '%s' (value: '%s')", string(key), string(val))
} }
} }
selector := make([]string, 2, 4) if hostLevel == nil || prevCluster != cluster || prevHost != host {
prevCluster = cluster
prevHost = host
selector = selector[:2]
selector[0] = cluster selector[0] = cluster
selector[1] = host selector[1] = host
hostLevel = memoryStore.root.findLevelOrCreate(selector, len(memoryStore.metrics))
}
selector = selector[:0]
if len(typeId) > 0 { if len(typeId) > 0 {
selector = append(selector, typeName+typeId) typeBuf = typeBuf[:0]
typeBuf = append(typeBuf, typeName...)
typeBuf = append(typeBuf, typeId...)
selector = append(selector, string(typeBuf)) // <- Allocation :(
if len(subTypeId) > 0 { if len(subTypeId) > 0 {
selector = append(selector, subType+subTypeId) subTypeBuf = subTypeBuf[:0]
subTypeBuf = append(subTypeBuf, subType...)
subTypeBuf = append(subTypeBuf, subTypeId...)
selector = append(selector, string(subTypeBuf))
} }
} }
metrics := make([]Metric, 0, 10) metrics = metrics[:0]
measurement := string(rawmeasurement) // A more dense lp format if supported if the measurement is 'data'.
if measurement == "data" { // In that case, the field keys are used as metric names.
if string(rawmeasurement) == "data" {
for { for {
key, val, err := dec.NextField() key, val, err := dec.NextField()
if err != nil { if err != nil {
@ -185,11 +221,12 @@ func decodeLine(dec *lineprotocol.Decoder) error {
} }
metrics = append(metrics, Metric{ metrics = append(metrics, Metric{
Name: string(key), Name: string(key), // <- Allocation :(
Value: value, Value: value,
}) })
} }
} else { } else {
measurement := string(rawmeasurement) // <- Allocation :(
var value Float var value Float
for { for {
key, val, err := dec.NextField() key, val, err := dec.NextField()
@ -220,13 +257,13 @@ func decodeLine(dec *lineprotocol.Decoder) error {
}) })
} }
t, err := dec.Time(lineprotocol.Second, time.Now()) t, err = dec.Time(lineprotocol.Second, t)
if err != nil { if err != nil {
return err return err
} }
// log.Printf("write: %s (%v) -> %v\n", string(measurement), selector, value) // log.Printf("write: %s (%v) -> %v\n", string(measurement), selector, value)
if err := memoryStore.Write(selector, t.Unix(), metrics); err != nil { if err := memoryStore.WriteToLevel(hostLevel, selector, t.Unix(), metrics); err != nil {
return err return err
} }
} }

View File

@ -2,7 +2,6 @@ package main
import ( import (
"errors" "errors"
"math"
"sync" "sync"
) )
@ -40,12 +39,14 @@ type buffer struct {
archived bool // If true, this buffer is already archived archived bool // If true, this buffer is already archived
closed bool closed bool
/*
statisticts struct { statisticts struct {
samples int samples int
min Float min Float
max Float max Float
avg Float avg Float
} }
*/
} }
func newBuffer(ts, freq int64) *buffer { func newBuffer(ts, freq int64) *buffer {
@ -104,6 +105,9 @@ func (b *buffer) end() int64 {
return b.start + int64(len(b.data))*b.frequency return b.start + int64(len(b.data))*b.frequency
} }
func (b *buffer) close() {}
/*
func (b *buffer) close() { func (b *buffer) close() {
if b.closed { if b.closed {
return return
@ -134,6 +138,7 @@ func (b *buffer) close() {
b.statisticts.max = NaN b.statisticts.max = NaN
} }
} }
*/
// func interpolate(idx int, data []Float) Float { // func interpolate(idx int, data []Float) Float {
// if idx == 0 || idx+1 == len(data) { // if idx == 0 || idx+1 == len(data) {
@ -362,7 +367,11 @@ func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore {
// Write all values in `metrics` to the level specified by `selector` for time `ts`. // Write all values in `metrics` to the level specified by `selector` for time `ts`.
// Look at `findLevelOrCreate` for how selectors work. // Look at `findLevelOrCreate` for how selectors work.
func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error { func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error {
l := m.root.findLevelOrCreate(selector, len(m.metrics)) return m.WriteToLevel(&m.root, selector, ts, metrics)
}
func (m *MemoryStore) WriteToLevel(l *level, selector []string, ts int64, metrics []Metric) error {
l = l.findLevelOrCreate(selector, len(m.metrics))
l.lock.Lock() l.lock.Lock()
defer l.lock.Unlock() defer l.lock.Unlock()