Change out-of-bounds behaviour

This commit is contained in:
Lou Knauer 2021-09-20 10:29:55 +02:00
parent 27a5c0b561
commit 2046415f9c
4 changed files with 93 additions and 5 deletions

50
debug.go Normal file
View File

@ -0,0 +1,50 @@
package main
import (
"bufio"
"fmt"
)
func (b *buffer) debugDump(w *bufio.Writer) {
if b.prev != nil {
b.prev.debugDump(w)
}
end := ""
if b.next != nil {
end = " -> "
}
to := b.start + b.frequency*int64(len(b.data))
fmt.Fprintf(w, "buffer(from=%d, len=%d, to=%d)%s", b.start, len(b.data), to, end)
}
func (l *level) debugDump(w *bufio.Writer, m *MemoryStore, indent string) error {
l.lock.RLock()
defer l.lock.RUnlock()
for name, minfo := range m.metrics {
b := l.metrics[minfo.offset]
if b != nil {
fmt.Fprintf(w, "%smetric '%s': ", indent, name)
b.debugDump(w)
fmt.Fprint(w, "\n")
}
}
if l.children != nil {
fmt.Fprintf(w, "%schildren:\n", indent)
for name, lvl := range l.children {
fmt.Fprintf(w, "%s'%s':\n", indent, name)
lvl.debugDump(w, m, "\t"+indent)
}
}
return nil
}
func (m *MemoryStore) DebugDump(w *bufio.Writer) error {
fmt.Fprintf(w, "MemoryStore:\n")
m.root.debugDump(w, m, " ")
return w.Flush()
}

View File

@ -218,7 +218,7 @@ func ReceiveNats(address string, handleLine func(line *Line), workers int, ctx c
log.Printf("NATS subscription to 'updates' on '%s' established\n", address)
_ = <-ctx.Done()
<-ctx.Done()
err = sub.Unsubscribe()
} else {
msgs := make(chan *nats.Msg, 16)
@ -251,7 +251,7 @@ func ReceiveNats(address string, handleLine func(line *Line), workers int, ctx c
log.Printf("NATS subscription to 'updates' on '%s' established\n", address)
_ = <-ctx.Done()
<-ctx.Done()
err = sub.Unsubscribe()
close(msgs)
wg.Wait()

View File

@ -107,7 +107,12 @@ func (b *buffer) read(from, to int64, data []Float) ([]Float, int64, int64, erro
idx = 0
}
if t < b.start || idx >= len(b.data) {
if idx >= len(b.data) {
if b.next == nil || to <= b.next.start {
break
}
data[i] += NaN
} else if t < b.start {
data[i] += NaN
} else {
data[i] += b.data[idx]

View File

@ -61,6 +61,39 @@ func TestMemoryStoreBasics(t *testing.T) {
}
func TestMemoryStoreOutOfBounds(t *testing.T) {
count := 2000
toffset := 1000
store := NewMemoryStore(map[string]MetricConfig{
"a": {Frequency: 60},
})
for i := 0; i < count; i++ {
if err := store.Write([]string{"cluster", "host", "cpu"}, int64(toffset+i*60), []Metric{
{Name: "a", Value: Float(i)},
}); err != nil {
t.Fatal(err)
}
}
// store.DebugDump(bufio.NewWriter(os.Stdout))
sel := Selector{{String: "cluster"}, {String: "host"}, {String: "cpu"}}
data, from, to, err := store.Read(sel, "a", 500, int64(toffset+count*60+500))
if err != nil {
t.Fatal(err)
}
if from != int64(toffset) || to != int64(toffset+count*60) {
t.Fatalf("Got %d-%d, expected %d-%d", from, to, toffset, toffset+count*60)
}
if len(data) != count || data[0] != 0 || data[len(data)-1] != Float((count-1)) {
t.Fatalf("Wrong data (got: %d, %f, %f, expected: %d, %f, %f)",
len(data), data[0], data[len(data)-1], count, 0., Float(count-1))
}
}
func TestMemoryStoreMissingDatapoints(t *testing.T) {
count := 3000
store := NewMemoryStore(map[string]MetricConfig{
@ -88,12 +121,12 @@ func TestMemoryStoreMissingDatapoints(t *testing.T) {
return
}
if len(adata) != count {
if len(adata) != count-2 {
t.Error("unexpected len")
return
}
for i := 0; i < count; i++ {
for i := 0; i < count-2; i++ {
if i%3 == 0 {
if adata[i] != Float(i) {
t.Error("unexpected value")