From 89acbe8db27f56b4818ca5bc6bf6131f57024632 Mon Sep 17 00:00:00 2001 From: Thomas Roehl Date: Fri, 14 Apr 2023 18:24:22 +0200 Subject: [PATCH] Accept unit tags for incoming metrics and store it in the buffer. --- lineprotocol.go | 6 +++++- memstore.go | 24 ++++++++++++++---------- memstore_test.go | 44 ++++++++++++++++++++++---------------------- 3 files changed, 41 insertions(+), 33 deletions(-) diff --git a/lineprotocol.go b/lineprotocol.go index b2e1692..aecabc2 100644 --- a/lineprotocol.go +++ b/lineprotocol.go @@ -16,6 +16,7 @@ import ( type Metric struct { Name string Value Float + Unit string mc MetricConfig } @@ -206,7 +207,7 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error { } typeBuf, subTypeBuf := typeBuf[:0], subTypeBuf[:0] - cluster, host := clusterDefault, "" + cluster, host, unit := clusterDefault, "", "" for { key, val, err := dec.NextTag() if err != nil { @@ -232,6 +233,8 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error { host = string(val) lvl = nil } + case "unit": + unit = string(val) case "type": if string(val) == "node" { break @@ -300,6 +303,7 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error { } else { return fmt.Errorf("unsupported value type in message: %s", val.Kind().String()) } + metric.Unit = unit } if t, err = dec.Time(lineprotocol.Second, t); err != nil { diff --git a/memstore.go b/memstore.go index 79b3f05..a896a65 100644 --- a/memstore.go +++ b/memstore.go @@ -35,6 +35,7 @@ var ( type buffer struct { frequency int64 // Time between two "slots" start int64 // Timestamp of when `data[0]` was written. + unit string // Unit for the data in this buffer data []Float // The slice should never reallocacte as `cap(data)` is respected. prev, next *buffer // `prev` contains older data, `next` newer data. archived bool // If true, this buffer is already archived @@ -50,12 +51,13 @@ type buffer struct { */ } -func newBuffer(ts, freq int64) *buffer { +func newBuffer(ts, freq int64, unit string) *buffer { b := bufferPool.Get().(*buffer) b.frequency = freq b.start = ts - (freq / 2) b.prev = nil b.next = nil + b.unit = unit b.archived = false b.closed = false b.data = b.data[:0] @@ -74,7 +76,7 @@ func (b *buffer) write(ts int64, value Float) (*buffer, error) { // idx := int((ts - b.start + (b.frequency / 3)) / b.frequency) idx := int((ts - b.start) / b.frequency) if idx >= cap(b.data) { - newbuf := newBuffer(ts, b.frequency) + newbuf := newBuffer(ts, b.frequency, b.unit) newbuf.prev = b b.next = newbuf b.close() @@ -412,7 +414,7 @@ func (m *MemoryStore) WriteToLevel(l *level, selector []string, ts int64, metric b := l.metrics[metric.mc.offset] if b == nil { // First write to this metric and level - b = newBuffer(ts, metric.mc.Frequency) + b = newBuffer(ts, metric.mc.Frequency, metric.Unit) l.metrics[metric.mc.offset] = b } @@ -433,14 +435,15 @@ func (m *MemoryStore) WriteToLevel(l *level, selector []string, ts int64, metric // If the level does not hold the metric itself, the data will be aggregated recursively from the children. // The second and third return value are the actual from/to for the data. Those can be different from // the range asked for if no data was available. -func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([]Float, int64, int64, error) { +func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([]Float, int64, int64, string, error) { + var unit string = "" if from > to { - return nil, 0, 0, errors.New("invalid time range") + return nil, 0, 0, "", errors.New("invalid time range") } minfo, ok := m.metrics[metric] if !ok { - return nil, 0, 0, errors.New("unkown metric: " + metric) + return nil, 0, 0, "", errors.New("unkown metric: " + metric) } n, data := 0, make([]Float, (to-from)/minfo.Frequency+1) @@ -449,6 +452,7 @@ func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([] if err != nil { return err } + unit = b.unit if n == 0 { from, to = cfrom, cto @@ -476,9 +480,9 @@ func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([] }) if err != nil { - return nil, 0, 0, err + return nil, 0, 0, "", err } else if n == 0 { - return nil, 0, 0, errors.New("metric or host not found") + return nil, 0, 0, "", errors.New("metric or host not found") } else if n > 1 { if minfo.Aggregation == AvgAggregation { normalize := 1. / Float(n) @@ -486,11 +490,11 @@ func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([] data[i] *= normalize } } else if minfo.Aggregation != SumAggregation { - return nil, 0, 0, errors.New("invalid aggregation") + return nil, 0, 0, "", errors.New("invalid aggregation") } } - return data, from, to, nil + return data, from, to, unit, nil } // Release all buffers for the selected level and all its children that contain only diff --git a/memstore_test.go b/memstore_test.go index 8821e7a..d17ba83 100644 --- a/memstore_test.go +++ b/memstore_test.go @@ -28,12 +28,12 @@ func TestMemoryStoreBasics(t *testing.T) { } sel := Selector{{String: "testhost"}} - adata, from, to, err := store.Read(sel, "a", start, start+count*frequency) - if err != nil || from != start || to != start+count*frequency { + adata, from, to, unit, err := store.Read(sel, "a", start, start+count*frequency) + if err != nil || from != start || to != start+count*frequency || unit != "" { t.Error(err) return } - bdata, _, _, err := store.Read(sel, "b", start, start+count*frequency) + bdata, _, _, unit, err := store.Read(sel, "b", start, start+count*frequency) if err != nil { t.Error(err) return @@ -83,23 +83,23 @@ func TestMemoryStoreTooMuchWrites(t *testing.T) { } end := start + int64(count)*frequency - data, from, to, err := store.Read(Selector{{String: "test"}}, "a", start, end) - if len(data) != count || from != start || to != end || err != nil { + data, from, to, unit, err := store.Read(Selector{{String: "test"}}, "a", start, end) + if len(data) != count || from != start || to != end || err != nil || unit != "" { t.Fatalf("a: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data) } - data, from, to, err = store.Read(Selector{{String: "test"}}, "b", start, end) - if len(data) != count/2 || from != start || to != end || err != nil { + data, from, to, unit, err = store.Read(Selector{{String: "test"}}, "b", start, end) + if len(data) != count/2 || from != start || to != end || err != nil || unit != "" { t.Fatalf("b: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data) } - data, from, to, err = store.Read(Selector{{String: "test"}}, "c", start, end) - if len(data) != count*2-1 || from != start || to != end-frequency/2 || err != nil { + data, from, to, unit, err = store.Read(Selector{{String: "test"}}, "c", start, end) + if len(data) != count*2-1 || from != start || to != end-frequency/2 || err != nil || unit != "" { t.Fatalf("c: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data) } - data, from, to, err = store.Read(Selector{{String: "test"}}, "d", start, end) - if len(data) != count/3+1 || from != start || to != end+frequency*2 || err != nil { + data, from, to, unit, err = store.Read(Selector{{String: "test"}}, "d", start, end) + if len(data) != count/3+1 || from != start || to != end+frequency*2 || err != nil || unit != "" { t.Errorf("expected: err=nil, from=%d, to=%d, len(data)=%d\n", start, end+frequency*2, count/3) t.Fatalf("d: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data) } @@ -121,7 +121,7 @@ func TestMemoryStoreOutOfBounds(t *testing.T) { } sel := Selector{{String: "cluster"}, {String: "host"}, {String: "cpu"}} - data, from, to, err := store.Read(sel, "a", 500, int64(toffset+count*60+500)) + data, from, to, unit, err := store.Read(sel, "a", 500, int64(toffset+count*60+500)) if err != nil { t.Fatal(err) } @@ -136,14 +136,14 @@ func TestMemoryStoreOutOfBounds(t *testing.T) { } testfrom, testlen := int64(100000000), int64(10000) - data, from, to, err = store.Read(sel, "a", testfrom, testfrom+testlen) - if len(data) != 0 || from != testfrom || to != testfrom || err != nil { + data, from, to, unit, err = store.Read(sel, "a", testfrom, testfrom+testlen) + if len(data) != 0 || from != testfrom || to != testfrom || err != nil || unit != "" { t.Fatal("Unexpected data returned when reading range after valid data") } testfrom, testlen = 0, 10 - data, from, to, err = store.Read(sel, "a", testfrom, testfrom+testlen) - if len(data) != 0 || from/60 != int64(toffset)/60 || to/60 != int64(toffset)/60 || err != nil { + data, from, to, unit, err = store.Read(sel, "a", testfrom, testfrom+testlen) + if len(data) != 0 || from/60 != int64(toffset)/60 || to/60 != int64(toffset)/60 || err != nil || unit != "" { t.Fatal("Unexpected data returned when reading range before valid data") } } @@ -169,7 +169,7 @@ func TestMemoryStoreMissingDatapoints(t *testing.T) { } sel := Selector{{String: "testhost"}} - adata, _, _, err := store.Read(sel, "a", 0, int64(count)) + adata, _, _, _, err := store.Read(sel, "a", 0, int64(count)) if err != nil { t.Error(err) return @@ -219,7 +219,7 @@ func TestMemoryStoreAggregation(t *testing.T) { } } - adata, from, to, err := store.Read(Selector{{String: "host0"}}, "a", int64(0), int64(count)) + adata, from, to, _, err := store.Read(Selector{{String: "host0"}}, "a", int64(0), int64(count)) if err != nil { t.Error(err) return @@ -352,7 +352,7 @@ func TestMemoryStoreArchive(t *testing.T) { } sel := Selector{{String: "cluster"}, {String: "host"}, {String: "cpu0"}} - adata, from, to, err := store2.Read(sel, "a", 100, int64(100+count)) + adata, from, to, _, err := store2.Read(sel, "a", 100, int64(100+count)) if err != nil { t.Error(err) return @@ -398,7 +398,7 @@ func TestMemoryStoreFree(t *testing.T) { t.Fatal("two buffers expected to be released") } - adata, from, to, err := store.Read(Selector{{String: "cluster"}, {String: "host"}, {String: "1"}}, "a", 0, int64(count)) + adata, from, to, _, err := store.Read(Selector{{String: "cluster"}, {String: "host"}, {String: "1"}}, "a", 0, int64(count)) if err != nil { t.Fatal(err) } @@ -451,7 +451,7 @@ func BenchmarkMemoryStoreConcurrentWrites(b *testing.B) { for g := 0; g < goroutines; g++ { host := fmt.Sprintf("host%d", g) sel := Selector{{String: "cluster"}, {String: host}, {String: "cpu0"}} - adata, _, _, err := store.Read(sel, "a", 0, int64(count)*frequency) + adata, _, _, _, err := store.Read(sel, "a", 0, int64(count)*frequency) if err != nil { b.Error(err) return @@ -500,7 +500,7 @@ func BenchmarkMemoryStoreAggregation(b *testing.B) { b.StartTimer() for n := 0; n < b.N; n++ { - data, from, to, err := store.Read(Selector{{String: "testcluster"}, {String: "host123"}}, "flops_any", 0, int64(count)) + data, from, to, _, err := store.Read(Selector{{String: "testcluster"}, {String: "host123"}}, "flops_any", 0, int64(count)) if err != nil { b.Fatal(err) }