diff --git a/debug.go b/debug.go new file mode 100644 index 0000000..a0b5888 --- /dev/null +++ b/debug.go @@ -0,0 +1,50 @@ +package main + +import ( + "bufio" + "fmt" +) + +func (b *buffer) debugDump(w *bufio.Writer) { + if b.prev != nil { + b.prev.debugDump(w) + } + + end := "" + if b.next != nil { + end = " -> " + } + + to := b.start + b.frequency*int64(len(b.data)) + fmt.Fprintf(w, "buffer(from=%d, len=%d, to=%d)%s", b.start, len(b.data), to, end) +} + +func (l *level) debugDump(w *bufio.Writer, m *MemoryStore, indent string) error { + l.lock.RLock() + defer l.lock.RUnlock() + + for name, minfo := range m.metrics { + b := l.metrics[minfo.offset] + if b != nil { + fmt.Fprintf(w, "%smetric '%s': ", indent, name) + b.debugDump(w) + fmt.Fprint(w, "\n") + } + } + + if l.children != nil { + fmt.Fprintf(w, "%schildren:\n", indent) + for name, lvl := range l.children { + fmt.Fprintf(w, "%s'%s':\n", indent, name) + lvl.debugDump(w, m, "\t"+indent) + } + } + + return nil +} + +func (m *MemoryStore) DebugDump(w *bufio.Writer) error { + fmt.Fprintf(w, "MemoryStore:\n") + m.root.debugDump(w, m, " ") + return w.Flush() +} diff --git a/lineprotocol.go b/lineprotocol.go index 5b3e246..05140d2 100644 --- a/lineprotocol.go +++ b/lineprotocol.go @@ -218,7 +218,7 @@ func ReceiveNats(address string, handleLine func(line *Line), workers int, ctx c log.Printf("NATS subscription to 'updates' on '%s' established\n", address) - _ = <-ctx.Done() + <-ctx.Done() err = sub.Unsubscribe() } else { msgs := make(chan *nats.Msg, 16) @@ -251,7 +251,7 @@ func ReceiveNats(address string, handleLine func(line *Line), workers int, ctx c log.Printf("NATS subscription to 'updates' on '%s' established\n", address) - _ = <-ctx.Done() + <-ctx.Done() err = sub.Unsubscribe() close(msgs) wg.Wait() diff --git a/memstore.go b/memstore.go index 534a847..1bc1ce5 100644 --- a/memstore.go +++ b/memstore.go @@ -107,7 +107,12 @@ func (b *buffer) read(from, to int64, data []Float) ([]Float, int64, int64, erro idx = 0 } - if t < b.start || idx >= len(b.data) { + if idx >= len(b.data) { + if b.next == nil || to <= b.next.start { + break + } + data[i] += NaN + } else if t < b.start { data[i] += NaN } else { data[i] += b.data[idx] diff --git a/memstore_test.go b/memstore_test.go index e8a0a94..8967103 100644 --- a/memstore_test.go +++ b/memstore_test.go @@ -61,6 +61,39 @@ func TestMemoryStoreBasics(t *testing.T) { } +func TestMemoryStoreOutOfBounds(t *testing.T) { + count := 2000 + toffset := 1000 + store := NewMemoryStore(map[string]MetricConfig{ + "a": {Frequency: 60}, + }) + + for i := 0; i < count; i++ { + if err := store.Write([]string{"cluster", "host", "cpu"}, int64(toffset+i*60), []Metric{ + {Name: "a", Value: Float(i)}, + }); err != nil { + t.Fatal(err) + } + } + + // store.DebugDump(bufio.NewWriter(os.Stdout)) + + sel := Selector{{String: "cluster"}, {String: "host"}, {String: "cpu"}} + data, from, to, err := store.Read(sel, "a", 500, int64(toffset+count*60+500)) + if err != nil { + t.Fatal(err) + } + + if from != int64(toffset) || to != int64(toffset+count*60) { + t.Fatalf("Got %d-%d, expected %d-%d", from, to, toffset, toffset+count*60) + } + + if len(data) != count || data[0] != 0 || data[len(data)-1] != Float((count-1)) { + t.Fatalf("Wrong data (got: %d, %f, %f, expected: %d, %f, %f)", + len(data), data[0], data[len(data)-1], count, 0., Float(count-1)) + } +} + func TestMemoryStoreMissingDatapoints(t *testing.T) { count := 3000 store := NewMemoryStore(map[string]MetricConfig{ @@ -88,12 +121,12 @@ func TestMemoryStoreMissingDatapoints(t *testing.T) { return } - if len(adata) != count { + if len(adata) != count-2 { t.Error("unexpected len") return } - for i := 0; i < count; i++ { + for i := 0; i < count-2; i++ { if i%3 == 0 { if adata[i] != Float(i) { t.Error("unexpected value")