From b55a67f869b713c1706308129a62697af04360a5 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Wed, 8 Sep 2021 12:17:10 +0200 Subject: [PATCH] New selector type for better selection of sockets/cpus --- api.go | 4 +- memstore.go | 127 ++++++++++++----------------------------------- memstore_test.go | 30 ++++++----- metric-store.go | 2 +- selector.go | 95 +++++++++++++++++++++++++++++++++++ stats.go | 50 ++++++++++++++++--- 6 files changed, 189 insertions(+), 119 deletions(-) create mode 100644 selector.go diff --git a/api.go b/api.go index 92314de..00593c4 100644 --- a/api.go +++ b/api.go @@ -19,7 +19,7 @@ import ( // } type ApiRequestBody struct { Metrics []string `json:"metrics"` - Selectors [][]string `json:"selectors"` + Selectors []Selector `json:"selectors"` } type ApiMetricData struct { @@ -165,7 +165,7 @@ func handleFree(rw http.ResponseWriter, r *http.Request) { } bodyDec := json.NewDecoder(r.Body) - var selectors [][]string + var selectors []Selector err = bodyDec.Decode(&selectors) if err != nil { http.Error(rw, err.Error(), http.StatusBadRequest) diff --git a/memstore.go b/memstore.go index 81966bf..ed53e86 100644 --- a/memstore.go +++ b/memstore.go @@ -206,89 +206,6 @@ func (l *level) findLevelOrCreate(selector []string, nMetrics int) *level { return child.findLevelOrCreate(selector[1:], nMetrics) } -// This function assmumes that `l.lock` is LOCKED! -// Read `buffer.read` for context. -// If this level does not have data for the requested metric, the data -// is aggregated timestep-wise from all the children (recursively). -func (l *level) read(offset int, from, to int64, data []Float) ([]Float, int, int64, int64, error) { - if b := l.metrics[offset]; b != nil { - // Whoo, this is the "native" level of this metric: - data, from, to, err := b.read(from, to, data) - return data, 1, from, to, err - } - - if len(l.children) == 0 { - return nil, 1, 0, 0, ErrNoData - } - - n := 0 - for _, child := range l.children { - child.lock.RLock() - cdata, cn, cfrom, cto, err := child.read(offset, from, to, data) - child.lock.RUnlock() - - if err == ErrNoData { - continue - } - - if err != nil { - return nil, 0, 0, 0, err - } - - if n == 0 { - data = cdata - from = cfrom - to = cto - n += cn - continue - } - - if cfrom != from || cto != to { - return nil, 0, 0, 0, ErrDataDoesNotAlign - } - - if len(data) != len(cdata) { - panic("WTF? Different freq. at different levels?") - } - - n += cn - } - - if n == 0 { - return nil, 0, 0, 0, ErrNoData - } - - return data, n, from, to, nil -} - -func (l *level) free(t int64) (int, error) { - l.lock.Lock() - defer l.lock.Unlock() - - n := 0 - for _, b := range l.metrics { - if b == nil { - continue - } - - m, err := b.free(t) - n += m - if err != nil { - return n, err - } - } - - for _, l := range l.children { - m, err := l.free(t) - n += m - if err != nil { - return n, err - } - } - - return n, nil -} - type AggregationStrategy int const ( @@ -382,11 +299,7 @@ func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error // Returns all values for metric `metric` from `from` to `to` for the selected level. // If the level does not hold the metric itself, the data will be aggregated recursively from the children. // See `level.read` for more information. -func (m *MemoryStore) Read(selector []string, metric string, from, to int64) ([]Float, int64, int64, error) { - l := m.root.findLevelOrCreate(selector, len(m.metrics)) - l.lock.RLock() - defer l.lock.RUnlock() - +func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([]Float, int64, int64, error) { if from > to { return nil, 0, 0, errors.New("invalid time range") } @@ -396,13 +309,29 @@ func (m *MemoryStore) Read(selector []string, metric string, from, to int64) ([] return nil, 0, 0, errors.New("unkown metric: " + metric) } - data := make([]Float, (to-from)/minfo.frequency+1) - data, n, from, to, err := l.read(minfo.offset, from, to, data) + n, data := 0, make([]Float, (to-from)/minfo.frequency+1) + err := m.root.findBuffers(selector, minfo.offset, func(b *buffer) error { + cdata, cfrom, cto, err := b.read(from, to, data) + if err != nil { + return err + } + + if n == 0 { + from, to = cfrom, cto + } else if from != cfrom || to != cto || len(data) != len(cdata) { + return ErrDataDoesNotAlign + } + + data = cdata + n += 1 + return nil + }) + if err != nil { return nil, 0, 0, err - } - - if n > 1 { + } else if n == 0 { + return nil, 0, 0, errors.New("metric not found") + } else if n > 1 { if minfo.aggregation == AvgAggregation { normalize := 1. / Float(n) for i := 0; i < len(data); i++ { @@ -413,11 +342,17 @@ func (m *MemoryStore) Read(selector []string, metric string, from, to int64) ([] } } - return data, from, to, err + return data, from, to, nil } // Release all buffers for the selected level and all its children that contain only // values older than `t`. -func (m *MemoryStore) Free(selector []string, t int64) (int, error) { - return m.root.findLevelOrCreate(selector, len(m.metrics)).free(t) +func (m *MemoryStore) Free(selector Selector, t int64) (int, error) { + n := 0 + err := m.root.findBuffers(selector, -1, func(b *buffer) error { + m, err := b.free(t) + n += m + return err + }) + return n, err } diff --git a/memstore_test.go b/memstore_test.go index 857f61f..50b7102 100644 --- a/memstore_test.go +++ b/memstore_test.go @@ -27,12 +27,13 @@ func TestMemoryStoreBasics(t *testing.T) { } } - adata, from, to, err := store.Read([]string{"testhost"}, "a", 0, count*frequency) + sel := Selector{{String: "testhost"}} + adata, from, to, err := store.Read(sel, "a", 0, count*frequency) if err != nil || from != 0 || to != count*frequency { t.Error(err) return } - bdata, _, _, err := store.Read([]string{"testhost"}, "b", 0, count*frequency) + bdata, _, _, err := store.Read(sel, "b", 0, count*frequency) if err != nil { t.Error(err) return @@ -80,7 +81,8 @@ func TestMemoryStoreMissingDatapoints(t *testing.T) { } } - adata, _, _, err := store.Read([]string{"testhost"}, "a", 0, int64(count)) + sel := Selector{{String: "testhost"}} + adata, _, _, err := store.Read(sel, "a", 0, int64(count)) if err != nil { t.Error(err) return @@ -133,7 +135,7 @@ func TestMemoryStoreAggregation(t *testing.T) { } } - adata, from, to, err := store.Read([]string{"host0"}, "a", int64(0), int64(count)) + adata, from, to, err := store.Read(Selector{{String: "host0"}}, "a", int64(0), int64(count)) if err != nil { t.Error(err) return @@ -152,7 +154,7 @@ func TestMemoryStoreAggregation(t *testing.T) { } } - bdata, from, to, err := store.Read([]string{"host0"}, "b", int64(0), int64(count)) + bdata, from, to, err := store.Read(Selector{{String: "host0"}}, "b", int64(0), int64(count)) if err != nil { t.Error(err) return @@ -215,7 +217,7 @@ func TestMemoryStoreStats(t *testing.T) { }) } - stats, from, to, err := store.Stats(sel1, "a", 0, int64(count)) + stats, from, to, err := store.Stats(Selector{{String: "cluster"}, {String: "host1"}}, "a", 0, int64(count)) if err != nil { t.Fatal(err) } @@ -228,7 +230,7 @@ func TestMemoryStoreStats(t *testing.T) { t.Fatalf("wrong stats: %#v\n", stats) } - stats, from, to, err = store.Stats([]string{"cluster", "host2"}, "b", 0, int64(count)) + stats, from, to, err = store.Stats(Selector{{String: "cluster"}, {String: "host2"}}, "b", 0, int64(count)) if err != nil { t.Fatal(err) } @@ -283,7 +285,8 @@ func TestMemoryStoreArchive(t *testing.T) { return } - adata, from, to, err := store2.Read([]string{"cluster", "host", "cpu0"}, "a", 100, int64(100+count)) + sel := Selector{{String: "cluster"}, {String: "host"}, {String: "cpu0"}} + adata, from, to, err := store2.Read(sel, "a", 100, int64(100+count)) if err != nil { t.Error(err) return @@ -320,7 +323,7 @@ func TestMemoryStoreFree(t *testing.T) { } } - n, err := store.Free([]string{"cluster", "host"}, int64(BUFFER_CAP*2)+100) + n, err := store.Free(Selector{{String: "cluster"}, {String: "host"}}, int64(BUFFER_CAP*2)+100) if err != nil { t.Fatal(err) } @@ -329,7 +332,7 @@ func TestMemoryStoreFree(t *testing.T) { t.Fatal("two buffers expected to be released") } - adata, from, to, err := store.Read([]string{"cluster", "host", "1"}, "a", 0, int64(count)) + adata, from, to, err := store.Read(Selector{{String: "cluster"}, {String: "host"}, {String: "1"}}, "a", 0, int64(count)) if err != nil { t.Fatal(err) } @@ -338,7 +341,7 @@ func TestMemoryStoreFree(t *testing.T) { t.Fatalf("unexpected values from call to `Read`: from=%d, to=%d, len=%d", from, to, len(adata)) } - bdata, from, to, err := store.Read([]string{"cluster", "host", "1"}, "b", 0, int64(count)) + bdata, from, to, err := store.Read(Selector{{String: "cluster"}, {String: "host"}, {String: "1"}}, "b", 0, int64(count)) if err != nil { t.Fatal(err) } @@ -380,7 +383,8 @@ func BenchmarkMemoryStoreConcurrentWrites(b *testing.B) { for g := 0; g < goroutines; g++ { host := fmt.Sprintf("host%d", g) - adata, _, _, err := store.Read([]string{"cluster", host, "cpu0"}, "a", 0, int64(count)*frequency) + sel := Selector{{String: "cluster"}, {String: host}, {String: "cpu0"}} + adata, _, _, err := store.Read(sel, "a", 0, int64(count)*frequency) if err != nil { b.Error(err) return @@ -429,7 +433,7 @@ func BenchmarkMemoryStoreAggregation(b *testing.B) { b.StartTimer() for n := 0; n < b.N; n++ { - data, from, to, err := store.Read(sel[0:2], "flops_any", 0, int64(count)) + data, from, to, err := store.Read(Selector{{String: "testcluster"}, {String: "host123"}}, "flops_any", 0, int64(count)) if err != nil { b.Fatal(err) } diff --git a/metric-store.go b/metric-store.go index 1ffd2c4..f13b6c4 100644 --- a/metric-store.go +++ b/metric-store.go @@ -124,7 +124,7 @@ func main() { if conf.RetentionHours > 0 { log.Println("Freeing up memory...") t := now.Add(-time.Duration(conf.RetentionHours) * time.Hour) - freed, err := memoryStore.Free([]string{}, t.Unix()) + freed, err := memoryStore.Free(Selector{}, t.Unix()) if err != nil { log.Printf("Freeing up memory failed: %s\n", err.Error()) } diff --git a/selector.go b/selector.go new file mode 100644 index 0000000..e857dee --- /dev/null +++ b/selector.go @@ -0,0 +1,95 @@ +package main + +import ( + "encoding/json" + "errors" +) + +type SelectorElement struct { + String string + Group []string +} + +func (se *SelectorElement) UnmarshalJSON(input []byte) error { + if input[0] == '"' { + return json.Unmarshal(input, &se.String) + } + + if input[0] == '[' { + return json.Unmarshal(input, &se.Group) + } + + return errors.New("the Go SelectorElement type can only be a string or an array of strings") +} + +func (se *SelectorElement) MarshalJSON() ([]byte, error) { + if se.String != "" { + return json.Marshal(se.String) + } + + if se.Group != nil { + return json.Marshal(se.Group) + } + + return nil, errors.New("a Go Selector must be a non-empty string or a non-empty slice of strings") +} + +type Selector []SelectorElement + +func (l *level) findBuffers(selector Selector, offset int, f func(b *buffer) error) error { + l.lock.RLock() + defer l.lock.RUnlock() + + if len(selector) == 0 { + if offset == -1 { + for _, b := range l.metrics { + if b != nil { + err := f(b) + if err != nil { + return err + } + } + } + } else { + b := l.metrics[offset] + if b != nil { + return f(b) + } + } + + for _, lvl := range l.children { + err := lvl.findBuffers(nil, offset, f) + if err != nil { + return err + } + } + return nil + } + + sel := selector[0] + if len(sel.String) != 0 { + lvl, ok := l.children[sel.String] + if ok { + err := lvl.findBuffers(selector[1:], offset, f) + if err != nil { + return err + } + } + return nil + } + + if sel.Group != nil { + for _, key := range sel.Group { + lvl, ok := l.children[key] + if ok { + err := lvl.findBuffers(selector[1:], offset, f) + if err != nil { + return err + } + } + } + return nil + } + + panic("impossible") +} diff --git a/stats.go b/stats.go index 53e4e9a..2e13b66 100644 --- a/stats.go +++ b/stats.go @@ -118,11 +118,7 @@ func (l *level) stats(offset int, from, to int64, aggreg AggregationStrategy) (S }, from, to, nil } -func (m *MemoryStore) Stats(selector []string, metric string, from, to int64) (*Stats, int64, int64, error) { - l := m.root.findLevelOrCreate(selector, len(m.metrics)) - l.lock.RLock() - defer l.lock.RUnlock() - +func (m *MemoryStore) Stats(selector Selector, metric string, from, to int64) (*Stats, int64, int64, error) { if from > to { return nil, 0, 0, errors.New("invalid time range") } @@ -132,6 +128,46 @@ func (m *MemoryStore) Stats(selector []string, metric string, from, to int64) (* return nil, 0, 0, errors.New("unkown metric: " + metric) } - stats, from, to, err := l.stats(minfo.offset, from, to, minfo.aggregation) - return &stats, from, to, err + n, samples := 0, 0 + avg, min, max := Float(0), math.MaxFloat32, -math.MaxFloat32 + err := m.root.findBuffers(selector, minfo.offset, func(b *buffer) error { + stats, cfrom, cto, err := b.stats(from, to) + if err != nil { + return err + } + + if n == 0 { + from, to = cfrom, cto + } else if from != cfrom || to != cto { + return ErrDataDoesNotAlign + } + + samples += stats.Samples + avg += stats.Avg + min = math.Min(min, float64(stats.Min)) + max = math.Max(max, float64(stats.Max)) + n += 1 + return nil + }) + + if err != nil { + return nil, 0, 0, err + } + + if n == 0 { + return nil, 0, 0, ErrNoData + } + + if minfo.aggregation == AvgAggregation { + avg /= Float(n) + } else if n > 1 && minfo.aggregation != SumAggregation { + return nil, 0, 0, errors.New("invalid aggregation") + } + + return &Stats{ + Samples: samples, + Avg: avg, + Min: Float(min), + Max: Float(max), + }, from, to, nil }