diff --git a/memstore.go b/memstore.go index ee79492..80f2943 100644 --- a/memstore.go +++ b/memstore.go @@ -8,7 +8,7 @@ import ( // Default buffer capacity. // `buffer.data` will only ever grow up to it's capacity and a new link // in the buffer chain will be created if needed so that no copying -// of needs to happen on writes. +// of data or reallocation needs to happen on writes. const ( BUFFER_CAP int = 1024 ) @@ -20,6 +20,11 @@ var bufferPool sync.Pool = sync.Pool{ }, } +var ( + ErrNoData error = errors.New("no data for this metric/level") + ErrDataDoesNotAlign error = errors.New("data from lower granularities does not align") +) + // Each metric on each level has it's own buffer. // This is where the actual values go. // If `cap(data)` is reached, a new buffer is created and @@ -80,18 +85,19 @@ func (b *buffer) write(ts int64, value Float) (*buffer, error) { // the actual `from`/`to`. // This function goes back the buffer chain if `from` is older than the // currents buffer start. -func (b *buffer) read(from, to int64) ([]Float, int64, int64, error) { +// The loaded values are added to `data` and `data` is returned, possibly with a shorter length. +// If `data` is not long enough to hold all values, this function will panic! +func (b *buffer) read(from, to int64, data []Float) ([]Float, int64, int64, error) { if from < b.start { if b.prev != nil { - return b.prev.read(from, to) + return b.prev.read(from, to, data) } from = b.start } - data := make([]Float, 0, (to-from)/b.frequency+1) - - var t int64 - for t = from; t < to; t += b.frequency { + var i int = 0 + var t int64 = from + for ; t < to; t += b.frequency { idx := int((t - b.start) / b.frequency) if idx >= cap(b.data) { b = b.next @@ -102,13 +108,44 @@ func (b *buffer) read(from, to int64) ([]Float, int64, int64, error) { } if t < b.start || idx >= len(b.data) { - data = append(data, NaN) + data[i] += NaN } else { - data = append(data, b.data[idx]) + data[i] += b.data[idx] } + i++ } - return data, from, t, nil + return data[:i], from, t, nil +} + +// Free up and free all buffers in the chain only containing data +// older than `t`. +func (b *buffer) free(t int64) (int, error) { + end := b.start + int64(len(b.data))*b.frequency + if end < t && b.next != nil { + b.next.prev = nil + n := 0 + for b != nil { + prev := b.prev + if prev != nil && prev.start > b.start { + panic("time travel?") + } + + n += 1 + bufferPool.Put(b.data) + b.data = nil + b.next = nil + b.prev = nil + b = prev + } + return n, nil + } + + if b.prev != nil { + return b.prev.free(t) + } + + return 0, nil } // Could also be called "node" as this forms a node in a tree structure. @@ -158,77 +195,82 @@ func (l *level) findLevelOrCreate(selector []string) *level { } // This function assmumes that `l.lock` is LOCKED! -// Read `buffer.read` for context. This function does -// a lot of short-lived allocations and copies if this is -// not the "native" level for the requested metric. There -// is a lot of optimization potential here! +// Read `buffer.read` for context. // If this level does not have data for the requested metric, the data // is aggregated timestep-wise from all the children (recursively). -// Optimization suggestion: Pass a buffer as argument onto which the values should be added. -func (l *level) read(metric string, from, to int64, aggregation string) ([]Float, int64, int64, error) { +func (l *level) read(metric string, from, to int64, data []Float) ([]Float, int, int64, int64, error) { if b, ok := l.metrics[metric]; ok { // Whoo, this is the "native" level of this metric: - return b.read(from, to) + data, from, to, err := b.read(from, to, data) + return data, 1, from, to, err } if len(l.children) == 0 { - return nil, 0, 0, errors.New("no data for that metric/level") + return nil, 1, 0, 0, ErrNoData } - if len(l.children) == 1 { - for _, child := range l.children { - child.lock.Lock() - data, from, to, err := child.read(metric, from, to, aggregation) - child.lock.Unlock() - return data, from, to, err - } - } - - // "slow" case: We need to accumulate metrics accross levels/scopes/tags/whatever. - var data []Float = nil + n := 0 for _, child := range l.children { - child.lock.Lock() - cdata, cfrom, cto, err := child.read(metric, from, to, aggregation) - child.lock.Unlock() + child.lock.RLock() + cdata, cn, cfrom, cto, err := child.read(metric, from, to, data) + child.lock.RUnlock() + + if err == ErrNoData { + continue + } if err != nil { - return nil, 0, 0, err + return nil, 0, 0, 0, err } - if data == nil { + if n == 0 { data = cdata from = cfrom to = cto + n += cn continue } if cfrom != from || cto != to { - // TODO: Here, we could take the max of cfrom and from and the min of cto and to instead. - // This would mean that we also have to resize data. - return nil, 0, 0, errors.New("data for metrics at child levels does not align") + return nil, 0, 0, 0, ErrDataDoesNotAlign } if len(data) != len(cdata) { panic("WTF? Different freq. at different levels?") } - for i := 0; i < len(data); i++ { - data[i] += cdata[i] + n += cn + } + + if n == 0 { + return nil, 0, 0, 0, ErrNoData + } + + return data, n, from, to, nil +} + +func (l *level) free(t int64) (int, error) { + l.lock.Lock() + defer l.lock.Unlock() + + n := 0 + for _, b := range l.metrics { + m, err := b.free(t) + n += m + if err != nil { + return n, err } } - switch aggregation { - case "sum": - return data, from, to, nil - case "avg": - normalize := 1. / Float(len(l.children)) - for i := 0; i < len(data); i++ { - data[i] *= normalize + for _, l := range l.children { + m, err := l.free(t) + n += m + if err != nil { + return n, err } - return data, from, to, nil - default: - return nil, 0, 0, errors.New("invalid aggregation strategy: " + aggregation) } + + return n, nil } type MemoryStore struct { @@ -258,7 +300,8 @@ func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error if !ok { minfo, ok := m.metrics[metric.Name] if !ok { - return errors.New("unkown metric: " + metric.Name) + // return errors.New("unkown metric: " + metric.Name) + continue } // First write to this metric and level @@ -279,6 +322,9 @@ func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error return nil } +// Returns all values for metric `metric` from `from` to `to` for the selected level. +// If the level does not hold the metric itself, the data will be aggregated recursively from the children. +// See `level.read` for more information. func (m *MemoryStore) Read(selector []string, metric string, from, to int64) ([]Float, int64, int64, error) { l := m.root.findLevelOrCreate(selector) l.lock.RLock() @@ -293,5 +339,28 @@ func (m *MemoryStore) Read(selector []string, metric string, from, to int64) ([] return nil, 0, 0, errors.New("unkown metric: " + metric) } - return l.read(metric, from, to, minfo.Aggregation) + data := make([]Float, (to-from)/minfo.Frequency) + data, n, from, to, err := l.read(metric, from, to, data) + if err != nil { + return nil, 0, 0, err + } + + if n > 1 { + if minfo.Aggregation == "avg" { + normalize := 1. / Float(n) + for i := 0; i < len(data); i++ { + data[i] *= normalize + } + } else if minfo.Aggregation != "sum" { + return nil, 0, 0, errors.New("invalid aggregation strategy: " + minfo.Aggregation) + } + } + + return data, from, to, err +} + +// Release all buffers for the selected level and all its children that contain only +// values older than `t`. +func (m *MemoryStore) Free(selector []string, t int64) (int, error) { + return m.root.findLevelOrCreate(selector).free(t) } diff --git a/memstore_test.go b/memstore_test.go index b15d91f..857f61f 100644 --- a/memstore_test.go +++ b/memstore_test.go @@ -302,6 +302,56 @@ func TestMemoryStoreArchive(t *testing.T) { } } +func TestMemoryStoreFree(t *testing.T) { + store := NewMemoryStore(map[string]MetricConfig{ + "a": {Frequency: 1}, + "b": {Frequency: 2}, + }) + + count := 3000 + sel := []string{"cluster", "host", "1"} + for i := 0; i < count; i++ { + err := store.Write(sel, int64(i), []Metric{ + {Name: "a", Value: Float(i)}, + {Name: "b", Value: Float(i)}, + }) + if err != nil { + t.Fatal(err) + } + } + + n, err := store.Free([]string{"cluster", "host"}, int64(BUFFER_CAP*2)+100) + if err != nil { + t.Fatal(err) + } + + if n != 3 { + t.Fatal("two buffers expected to be released") + } + + adata, from, to, err := store.Read([]string{"cluster", "host", "1"}, "a", 0, int64(count)) + if err != nil { + t.Fatal(err) + } + + if from != int64(BUFFER_CAP*2) || to != int64(count) || len(adata) != count-2*BUFFER_CAP { + t.Fatalf("unexpected values from call to `Read`: from=%d, to=%d, len=%d", from, to, len(adata)) + } + + bdata, from, to, err := store.Read([]string{"cluster", "host", "1"}, "b", 0, int64(count)) + if err != nil { + t.Fatal(err) + } + + if from != int64(BUFFER_CAP*2) || to != int64(count) || len(bdata) != (count-2*BUFFER_CAP)/2 { + t.Fatalf("unexpected values from call to `Read`: from=%d, to=%d, len=%d", from, to, len(bdata)) + } + + if adata[0] != Float(BUFFER_CAP*2) || adata[len(adata)-1] != Float(count-1) { + t.Fatal("wrong values") + } +} + func BenchmarkMemoryStoreConcurrentWrites(b *testing.B) { frequency := int64(5) count := b.N