diff --git a/memstore.go b/memstore.go index 973b869..88964ba 100644 --- a/memstore.go +++ b/memstore.go @@ -59,7 +59,13 @@ func (b *buffer) write(ts int64, value Float) (*buffer, error) { return nil, errors.New("cannot write value to buffer from past") } - idx := int((ts - b.start) / b.frequency) + // When a new buffer is created, it starts at ts. If we would + // use the same index calculation as for a read here, even a very + // slight drift in the timestamps of values will cause cases where + // a cell is re-written. Adding any value smaller than half the frequency + // here creates a time buffer around the cutoff from one cell to the next + // with the same semantics as before. + idx := int((ts - b.start + (b.frequency / 3)) / b.frequency) if idx >= cap(b.data) { newbuf := newBuffer(ts, b.frequency) newbuf.prev = b diff --git a/memstore_test.go b/memstore_test.go index 3b7d759..5c4ab52 100644 --- a/memstore_test.go +++ b/memstore_test.go @@ -61,6 +61,51 @@ func TestMemoryStoreBasics(t *testing.T) { } +func TestMemoryStoreTooMuchWrites(t *testing.T) { + frequency := int64(10) + count := BUFFER_CAP*3 + 10 + store := NewMemoryStore(map[string]MetricConfig{ + "a": {Frequency: frequency}, + "b": {Frequency: frequency * 2}, + "c": {Frequency: frequency / 2}, + "d": {Frequency: frequency * 3}, + }) + + start := int64(100) + for i := 0; i < count; i++ { + if err := store.Write([]string{"test"}, start+int64(i)*frequency, []Metric{ + {Name: "a", Value: Float(i)}, + {Name: "b", Value: Float(i / 2)}, + {Name: "c", Value: Float(i * 2)}, + {Name: "d", Value: Float(i / 3)}, + }); err != nil { + t.Fatal(err) + } + } + + end := start + int64(count)*frequency + data, from, to, err := store.Read(Selector{{String: "test"}}, "a", start, end) + if len(data) != count || from != start || to != end || err != nil { + t.Fatalf("a: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data) + } + + data, from, to, err = store.Read(Selector{{String: "test"}}, "b", start, end) + if len(data) != count/2 || from != start || to != end || err != nil { + t.Fatalf("b: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data) + } + + data, from, to, err = store.Read(Selector{{String: "test"}}, "c", start, end) + if len(data) != count*2-1 || from != start || to != end-frequency/2 || err != nil { + t.Fatalf("c: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data) + } + + data, from, to, err = store.Read(Selector{{String: "test"}}, "d", start, end) + if len(data) != count/3+1 || from != start || to != end+frequency*2 || err != nil { + t.Errorf("expected: err=nil, from=%d, to=%d, len(data)=%d\n", start, end+frequency*2, count/3) + t.Fatalf("d: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data) + } +} + func TestMemoryStoreOutOfBounds(t *testing.T) { count := 2000 toffset := 1000