Accept unit tags for incoming metrics and store it in the buffer.

This commit is contained in:
Thomas Roehl 2023-04-14 18:24:22 +02:00
parent c8c6560040
commit 89acbe8db2
3 changed files with 41 additions and 33 deletions

View File

@ -16,6 +16,7 @@ import (
type Metric struct { type Metric struct {
Name string Name string
Value Float Value Float
Unit string
mc MetricConfig mc MetricConfig
} }
@ -206,7 +207,7 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error {
} }
typeBuf, subTypeBuf := typeBuf[:0], subTypeBuf[:0] typeBuf, subTypeBuf := typeBuf[:0], subTypeBuf[:0]
cluster, host := clusterDefault, "" cluster, host, unit := clusterDefault, "", ""
for { for {
key, val, err := dec.NextTag() key, val, err := dec.NextTag()
if err != nil { if err != nil {
@ -232,6 +233,8 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error {
host = string(val) host = string(val)
lvl = nil lvl = nil
} }
case "unit":
unit = string(val)
case "type": case "type":
if string(val) == "node" { if string(val) == "node" {
break break
@ -300,6 +303,7 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error {
} else { } else {
return fmt.Errorf("unsupported value type in message: %s", val.Kind().String()) return fmt.Errorf("unsupported value type in message: %s", val.Kind().String())
} }
metric.Unit = unit
} }
if t, err = dec.Time(lineprotocol.Second, t); err != nil { if t, err = dec.Time(lineprotocol.Second, t); err != nil {

View File

@ -35,6 +35,7 @@ var (
type buffer struct { type buffer struct {
frequency int64 // Time between two "slots" frequency int64 // Time between two "slots"
start int64 // Timestamp of when `data[0]` was written. start int64 // Timestamp of when `data[0]` was written.
unit string // Unit for the data in this buffer
data []Float // The slice should never reallocacte as `cap(data)` is respected. data []Float // The slice should never reallocacte as `cap(data)` is respected.
prev, next *buffer // `prev` contains older data, `next` newer data. prev, next *buffer // `prev` contains older data, `next` newer data.
archived bool // If true, this buffer is already archived archived bool // If true, this buffer is already archived
@ -50,12 +51,13 @@ type buffer struct {
*/ */
} }
func newBuffer(ts, freq int64) *buffer { func newBuffer(ts, freq int64, unit string) *buffer {
b := bufferPool.Get().(*buffer) b := bufferPool.Get().(*buffer)
b.frequency = freq b.frequency = freq
b.start = ts - (freq / 2) b.start = ts - (freq / 2)
b.prev = nil b.prev = nil
b.next = nil b.next = nil
b.unit = unit
b.archived = false b.archived = false
b.closed = false b.closed = false
b.data = b.data[:0] b.data = b.data[:0]
@ -74,7 +76,7 @@ func (b *buffer) write(ts int64, value Float) (*buffer, error) {
// idx := int((ts - b.start + (b.frequency / 3)) / b.frequency) // idx := int((ts - b.start + (b.frequency / 3)) / b.frequency)
idx := int((ts - b.start) / b.frequency) idx := int((ts - b.start) / b.frequency)
if idx >= cap(b.data) { if idx >= cap(b.data) {
newbuf := newBuffer(ts, b.frequency) newbuf := newBuffer(ts, b.frequency, b.unit)
newbuf.prev = b newbuf.prev = b
b.next = newbuf b.next = newbuf
b.close() b.close()
@ -412,7 +414,7 @@ func (m *MemoryStore) WriteToLevel(l *level, selector []string, ts int64, metric
b := l.metrics[metric.mc.offset] b := l.metrics[metric.mc.offset]
if b == nil { if b == nil {
// First write to this metric and level // First write to this metric and level
b = newBuffer(ts, metric.mc.Frequency) b = newBuffer(ts, metric.mc.Frequency, metric.Unit)
l.metrics[metric.mc.offset] = b l.metrics[metric.mc.offset] = b
} }
@ -433,14 +435,15 @@ func (m *MemoryStore) WriteToLevel(l *level, selector []string, ts int64, metric
// If the level does not hold the metric itself, the data will be aggregated recursively from the children. // If the level does not hold the metric itself, the data will be aggregated recursively from the children.
// The second and third return value are the actual from/to for the data. Those can be different from // The second and third return value are the actual from/to for the data. Those can be different from
// the range asked for if no data was available. // the range asked for if no data was available.
func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([]Float, int64, int64, error) { func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([]Float, int64, int64, string, error) {
var unit string = ""
if from > to { if from > to {
return nil, 0, 0, errors.New("invalid time range") return nil, 0, 0, "", errors.New("invalid time range")
} }
minfo, ok := m.metrics[metric] minfo, ok := m.metrics[metric]
if !ok { if !ok {
return nil, 0, 0, errors.New("unkown metric: " + metric) return nil, 0, 0, "", errors.New("unkown metric: " + metric)
} }
n, data := 0, make([]Float, (to-from)/minfo.Frequency+1) n, data := 0, make([]Float, (to-from)/minfo.Frequency+1)
@ -449,6 +452,7 @@ func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([]
if err != nil { if err != nil {
return err return err
} }
unit = b.unit
if n == 0 { if n == 0 {
from, to = cfrom, cto from, to = cfrom, cto
@ -476,9 +480,9 @@ func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([]
}) })
if err != nil { if err != nil {
return nil, 0, 0, err return nil, 0, 0, "", err
} else if n == 0 { } else if n == 0 {
return nil, 0, 0, errors.New("metric or host not found") return nil, 0, 0, "", errors.New("metric or host not found")
} else if n > 1 { } else if n > 1 {
if minfo.Aggregation == AvgAggregation { if minfo.Aggregation == AvgAggregation {
normalize := 1. / Float(n) normalize := 1. / Float(n)
@ -486,11 +490,11 @@ func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([]
data[i] *= normalize data[i] *= normalize
} }
} else if minfo.Aggregation != SumAggregation { } else if minfo.Aggregation != SumAggregation {
return nil, 0, 0, errors.New("invalid aggregation") return nil, 0, 0, "", errors.New("invalid aggregation")
} }
} }
return data, from, to, nil return data, from, to, unit, nil
} }
// Release all buffers for the selected level and all its children that contain only // Release all buffers for the selected level and all its children that contain only

View File

@ -28,12 +28,12 @@ func TestMemoryStoreBasics(t *testing.T) {
} }
sel := Selector{{String: "testhost"}} sel := Selector{{String: "testhost"}}
adata, from, to, err := store.Read(sel, "a", start, start+count*frequency) adata, from, to, unit, err := store.Read(sel, "a", start, start+count*frequency)
if err != nil || from != start || to != start+count*frequency { if err != nil || from != start || to != start+count*frequency || unit != "" {
t.Error(err) t.Error(err)
return return
} }
bdata, _, _, err := store.Read(sel, "b", start, start+count*frequency) bdata, _, _, unit, err := store.Read(sel, "b", start, start+count*frequency)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
@ -83,23 +83,23 @@ func TestMemoryStoreTooMuchWrites(t *testing.T) {
} }
end := start + int64(count)*frequency end := start + int64(count)*frequency
data, from, to, err := store.Read(Selector{{String: "test"}}, "a", start, end) data, from, to, unit, err := store.Read(Selector{{String: "test"}}, "a", start, end)
if len(data) != count || from != start || to != end || err != nil { if len(data) != count || from != start || to != end || err != nil || unit != "" {
t.Fatalf("a: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data) t.Fatalf("a: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data)
} }
data, from, to, err = store.Read(Selector{{String: "test"}}, "b", start, end) data, from, to, unit, err = store.Read(Selector{{String: "test"}}, "b", start, end)
if len(data) != count/2 || from != start || to != end || err != nil { if len(data) != count/2 || from != start || to != end || err != nil || unit != "" {
t.Fatalf("b: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data) t.Fatalf("b: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data)
} }
data, from, to, err = store.Read(Selector{{String: "test"}}, "c", start, end) data, from, to, unit, err = store.Read(Selector{{String: "test"}}, "c", start, end)
if len(data) != count*2-1 || from != start || to != end-frequency/2 || err != nil { if len(data) != count*2-1 || from != start || to != end-frequency/2 || err != nil || unit != "" {
t.Fatalf("c: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data) t.Fatalf("c: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data)
} }
data, from, to, err = store.Read(Selector{{String: "test"}}, "d", start, end) data, from, to, unit, err = store.Read(Selector{{String: "test"}}, "d", start, end)
if len(data) != count/3+1 || from != start || to != end+frequency*2 || err != nil { if len(data) != count/3+1 || from != start || to != end+frequency*2 || err != nil || unit != "" {
t.Errorf("expected: err=nil, from=%d, to=%d, len(data)=%d\n", start, end+frequency*2, count/3) t.Errorf("expected: err=nil, from=%d, to=%d, len(data)=%d\n", start, end+frequency*2, count/3)
t.Fatalf("d: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data) t.Fatalf("d: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data)
} }
@ -121,7 +121,7 @@ func TestMemoryStoreOutOfBounds(t *testing.T) {
} }
sel := Selector{{String: "cluster"}, {String: "host"}, {String: "cpu"}} sel := Selector{{String: "cluster"}, {String: "host"}, {String: "cpu"}}
data, from, to, err := store.Read(sel, "a", 500, int64(toffset+count*60+500)) data, from, to, unit, err := store.Read(sel, "a", 500, int64(toffset+count*60+500))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -136,14 +136,14 @@ func TestMemoryStoreOutOfBounds(t *testing.T) {
} }
testfrom, testlen := int64(100000000), int64(10000) testfrom, testlen := int64(100000000), int64(10000)
data, from, to, err = store.Read(sel, "a", testfrom, testfrom+testlen) data, from, to, unit, err = store.Read(sel, "a", testfrom, testfrom+testlen)
if len(data) != 0 || from != testfrom || to != testfrom || err != nil { if len(data) != 0 || from != testfrom || to != testfrom || err != nil || unit != "" {
t.Fatal("Unexpected data returned when reading range after valid data") t.Fatal("Unexpected data returned when reading range after valid data")
} }
testfrom, testlen = 0, 10 testfrom, testlen = 0, 10
data, from, to, err = store.Read(sel, "a", testfrom, testfrom+testlen) data, from, to, unit, err = store.Read(sel, "a", testfrom, testfrom+testlen)
if len(data) != 0 || from/60 != int64(toffset)/60 || to/60 != int64(toffset)/60 || err != nil { if len(data) != 0 || from/60 != int64(toffset)/60 || to/60 != int64(toffset)/60 || err != nil || unit != "" {
t.Fatal("Unexpected data returned when reading range before valid data") t.Fatal("Unexpected data returned when reading range before valid data")
} }
} }
@ -169,7 +169,7 @@ func TestMemoryStoreMissingDatapoints(t *testing.T) {
} }
sel := Selector{{String: "testhost"}} sel := Selector{{String: "testhost"}}
adata, _, _, err := store.Read(sel, "a", 0, int64(count)) adata, _, _, _, err := store.Read(sel, "a", 0, int64(count))
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
@ -219,7 +219,7 @@ func TestMemoryStoreAggregation(t *testing.T) {
} }
} }
adata, from, to, err := store.Read(Selector{{String: "host0"}}, "a", int64(0), int64(count)) adata, from, to, _, err := store.Read(Selector{{String: "host0"}}, "a", int64(0), int64(count))
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
@ -352,7 +352,7 @@ func TestMemoryStoreArchive(t *testing.T) {
} }
sel := Selector{{String: "cluster"}, {String: "host"}, {String: "cpu0"}} sel := Selector{{String: "cluster"}, {String: "host"}, {String: "cpu0"}}
adata, from, to, err := store2.Read(sel, "a", 100, int64(100+count)) adata, from, to, _, err := store2.Read(sel, "a", 100, int64(100+count))
if err != nil { if err != nil {
t.Error(err) t.Error(err)
return return
@ -398,7 +398,7 @@ func TestMemoryStoreFree(t *testing.T) {
t.Fatal("two buffers expected to be released") t.Fatal("two buffers expected to be released")
} }
adata, from, to, err := store.Read(Selector{{String: "cluster"}, {String: "host"}, {String: "1"}}, "a", 0, int64(count)) adata, from, to, _, err := store.Read(Selector{{String: "cluster"}, {String: "host"}, {String: "1"}}, "a", 0, int64(count))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -451,7 +451,7 @@ func BenchmarkMemoryStoreConcurrentWrites(b *testing.B) {
for g := 0; g < goroutines; g++ { for g := 0; g < goroutines; g++ {
host := fmt.Sprintf("host%d", g) host := fmt.Sprintf("host%d", g)
sel := Selector{{String: "cluster"}, {String: host}, {String: "cpu0"}} sel := Selector{{String: "cluster"}, {String: host}, {String: "cpu0"}}
adata, _, _, err := store.Read(sel, "a", 0, int64(count)*frequency) adata, _, _, _, err := store.Read(sel, "a", 0, int64(count)*frequency)
if err != nil { if err != nil {
b.Error(err) b.Error(err)
return return
@ -500,7 +500,7 @@ func BenchmarkMemoryStoreAggregation(b *testing.B) {
b.StartTimer() b.StartTimer()
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
data, from, to, err := store.Read(Selector{{String: "testcluster"}, {String: "host123"}}, "flops_any", 0, int64(count)) data, from, to, _, err := store.Read(Selector{{String: "testcluster"}, {String: "host123"}}, "flops_any", 0, int64(count))
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }