diff --git a/lineprotocol.go b/lineprotocol.go index a5d99f6..5e84da0 100644 --- a/lineprotocol.go +++ b/lineprotocol.go @@ -117,13 +117,26 @@ func ReceiveNats(address string, handleLine func(dec *lineprotocol.Decoder) erro } func decodeLine(dec *lineprotocol.Decoder) error { + // Reduce allocations in loop: + t := time.Now() + metrics := make([]Metric, 0, 10) + selector := make([]string, 0, 4) + typeBuf, subTypeBuf := make([]byte, 0, 20), make([]byte, 0) + + // Optimize for the case where all lines in a "batch" are about the same + // cluster and host. By using `WriteToLevel` (level = host), we do not need + // to take the root- and cluster-level lock as often. + var hostLevel *level = nil + var prevCluster, prevHost string = "", "" + for dec.Next() { rawmeasurement, err := dec.Measurement() if err != nil { return err } - var cluster, host, typeName, typeId, subType, subTypeId string + var cluster, host string + var typeName, typeId, subType, subTypeId []byte for { key, val, err := dec.NextTag() if err != nil { @@ -133,38 +146,61 @@ func decodeLine(dec *lineprotocol.Decoder) error { break } + // The go compiler optimizes string([]byte{...}) == "...": switch string(key) { case "cluster": - cluster = string(val) + if string(val) == prevCluster { + cluster = prevCluster + } else { + cluster = string(val) + } case "hostname": - host = string(val) + if string(val) == prevHost { + host = prevHost + } else { + host = string(val) + } case "type": - typeName = string(val) + typeName = val case "type-id": - typeId = string(val) + typeId = val case "subtype": - subType = string(val) + subType = val case "stype-id": - subTypeId = string(val) + subTypeId = val default: // Ignore unkown tags (cc-metric-collector might send us a unit for example that we do not need) // return fmt.Errorf("unkown tag: '%s' (value: '%s')", string(key), string(val)) } } - selector := make([]string, 2, 4) - selector[0] = cluster - selector[1] = host + if hostLevel == nil || prevCluster != cluster || prevHost != host { + prevCluster = cluster + prevHost = host + selector = selector[:2] + selector[0] = cluster + selector[1] = host + hostLevel = memoryStore.root.findLevelOrCreate(selector, len(memoryStore.metrics)) + } + + selector = selector[:0] if len(typeId) > 0 { - selector = append(selector, typeName+typeId) + typeBuf = typeBuf[:0] + typeBuf = append(typeBuf, typeName...) + typeBuf = append(typeBuf, typeId...) + selector = append(selector, string(typeBuf)) // <- Allocation :( if len(subTypeId) > 0 { - selector = append(selector, subType+subTypeId) + subTypeBuf = subTypeBuf[:0] + subTypeBuf = append(subTypeBuf, subType...) + subTypeBuf = append(subTypeBuf, subTypeId...) + selector = append(selector, string(subTypeBuf)) } } - metrics := make([]Metric, 0, 10) - measurement := string(rawmeasurement) - if measurement == "data" { + metrics = metrics[:0] + // A more dense lp format if supported if the measurement is 'data'. + // In that case, the field keys are used as metric names. + if string(rawmeasurement) == "data" { for { key, val, err := dec.NextField() if err != nil { @@ -185,11 +221,12 @@ func decodeLine(dec *lineprotocol.Decoder) error { } metrics = append(metrics, Metric{ - Name: string(key), + Name: string(key), // <- Allocation :( Value: value, }) } } else { + measurement := string(rawmeasurement) // <- Allocation :( var value Float for { key, val, err := dec.NextField() @@ -220,13 +257,13 @@ func decodeLine(dec *lineprotocol.Decoder) error { }) } - t, err := dec.Time(lineprotocol.Second, time.Now()) + t, err = dec.Time(lineprotocol.Second, t) if err != nil { return err } // log.Printf("write: %s (%v) -> %v\n", string(measurement), selector, value) - if err := memoryStore.Write(selector, t.Unix(), metrics); err != nil { + if err := memoryStore.WriteToLevel(hostLevel, selector, t.Unix(), metrics); err != nil { return err } } diff --git a/memstore.go b/memstore.go index 5337fd8..339d195 100644 --- a/memstore.go +++ b/memstore.go @@ -2,7 +2,6 @@ package main import ( "errors" - "math" "sync" ) @@ -39,13 +38,15 @@ type buffer struct { prev, next *buffer // `prev` contains older data, `next` newer data. archived bool // If true, this buffer is already archived - closed bool - statisticts struct { - samples int - min Float - max Float - avg Float - } + closed bool + /* + statisticts struct { + samples int + min Float + max Float + avg Float + } + */ } func newBuffer(ts, freq int64) *buffer { @@ -104,6 +105,9 @@ func (b *buffer) end() int64 { return b.start + int64(len(b.data))*b.frequency } +func (b *buffer) close() {} + +/* func (b *buffer) close() { if b.closed { return @@ -134,6 +138,7 @@ func (b *buffer) close() { b.statisticts.max = NaN } } +*/ // func interpolate(idx int, data []Float) Float { // if idx == 0 || idx+1 == len(data) { @@ -362,7 +367,11 @@ func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore { // Write all values in `metrics` to the level specified by `selector` for time `ts`. // Look at `findLevelOrCreate` for how selectors work. func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error { - l := m.root.findLevelOrCreate(selector, len(m.metrics)) + return m.WriteToLevel(&m.root, selector, ts, metrics) +} + +func (m *MemoryStore) WriteToLevel(l *level, selector []string, ts int64, metrics []Metric) error { + l = l.findLevelOrCreate(selector, len(m.metrics)) l.lock.Lock() defer l.lock.Unlock()