Reduce gaps/rewrites in the same cell with offset

A new buffer remembers the timestamp of the first write.
Instead of cutting of cells relative to that time, have
a little "time buffer" so that rewriting the same cell twice
happens less often.
This commit is contained in:
Lou Knauer 2021-12-02 12:57:35 +01:00
parent becf41f98c
commit 5d89d87a2d
2 changed files with 52 additions and 1 deletions

View File

@ -59,7 +59,13 @@ func (b *buffer) write(ts int64, value Float) (*buffer, error) {
return nil, errors.New("cannot write value to buffer from past")
}
idx := int((ts - b.start) / b.frequency)
// When a new buffer is created, it starts at ts. If we would
// use the same index calculation as for a read here, even a very
// slight drift in the timestamps of values will cause cases where
// a cell is re-written. Adding any value smaller than half the frequency
// here creates a time buffer around the cutoff from one cell to the next
// with the same semantics as before.
idx := int((ts - b.start + (b.frequency / 3)) / b.frequency)
if idx >= cap(b.data) {
newbuf := newBuffer(ts, b.frequency)
newbuf.prev = b

View File

@ -61,6 +61,51 @@ func TestMemoryStoreBasics(t *testing.T) {
}
func TestMemoryStoreTooMuchWrites(t *testing.T) {
frequency := int64(10)
count := BUFFER_CAP*3 + 10
store := NewMemoryStore(map[string]MetricConfig{
"a": {Frequency: frequency},
"b": {Frequency: frequency * 2},
"c": {Frequency: frequency / 2},
"d": {Frequency: frequency * 3},
})
start := int64(100)
for i := 0; i < count; i++ {
if err := store.Write([]string{"test"}, start+int64(i)*frequency, []Metric{
{Name: "a", Value: Float(i)},
{Name: "b", Value: Float(i / 2)},
{Name: "c", Value: Float(i * 2)},
{Name: "d", Value: Float(i / 3)},
}); err != nil {
t.Fatal(err)
}
}
end := start + int64(count)*frequency
data, from, to, err := store.Read(Selector{{String: "test"}}, "a", start, end)
if len(data) != count || from != start || to != end || err != nil {
t.Fatalf("a: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data)
}
data, from, to, err = store.Read(Selector{{String: "test"}}, "b", start, end)
if len(data) != count/2 || from != start || to != end || err != nil {
t.Fatalf("b: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data)
}
data, from, to, err = store.Read(Selector{{String: "test"}}, "c", start, end)
if len(data) != count*2-1 || from != start || to != end-frequency/2 || err != nil {
t.Fatalf("c: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data)
}
data, from, to, err = store.Read(Selector{{String: "test"}}, "d", start, end)
if len(data) != count/3+1 || from != start || to != end+frequency*2 || err != nil {
t.Errorf("expected: err=nil, from=%d, to=%d, len(data)=%d\n", start, end+frequency*2, count/3)
t.Fatalf("d: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data)
}
}
func TestMemoryStoreOutOfBounds(t *testing.T) {
count := 2000
toffset := 1000