From a8bc250600867f4b8923c6c3afcc392ce4c2801e Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Mon, 25 Jul 2022 14:04:26 +0200 Subject: [PATCH] test streaming checkpoints --- internal/memstore/checkpoints_test.go | 96 +++++++++++++++++++++++ internal/memstore/memstore.go | 45 ++++++++++- internal/memstore/streaming-checkpoint.go | 38 +++++++-- 3 files changed, 169 insertions(+), 10 deletions(-) diff --git a/internal/memstore/checkpoints_test.go b/internal/memstore/checkpoints_test.go index ff81d14..93e317d 100644 --- a/internal/memstore/checkpoints_test.go +++ b/internal/memstore/checkpoints_test.go @@ -5,12 +5,66 @@ import ( "bytes" "encoding/json" "log" + "math" "reflect" "testing" "github.com/ClusterCockpit/cc-metric-store/internal/types" ) +func createTestStore(t *testing.T, withData bool) *MemoryStore { + ms := NewMemoryStore(map[string]types.MetricConfig{ + "flops": {Frequency: 1}, + "membw": {Frequency: 1}, + "ipc": {Frequency: 2}, + }) + + if !withData { + return ms + } + + n := 1000 + sel := []string{"hello", "world"} + for i := 0; i < n; i++ { + if err := ms.Write(sel, int64(i), []types.Metric{ + {Name: "flops", Value: types.Float(math.Sin(float64(i) * 0.1))}, + }); err != nil { + t.Fatal(err) + } + } + + // n := 3000 + // x1, x2, x3 := 0.0, 1.1, 2.2 + // d1, d2, d3 := 0.05, 0.1, 0.2 + + // sel1, sel2, sel3 := []string{"cluster"}, []string{"cluster", "host1"}, []string{"cluster", "host2", "cpu0"} + // for i := 0; i < n; i++ { + // ms.Write(sel1, int64(i), []types.Metric{ + // {Name: "flops", Value: types.Float(x1)}, + // {Name: "membw", Value: types.Float(x2)}, + // {Name: "ipc", Value: types.Float(x3)}, + // }) + + // ms.Write(sel2, int64(i), []types.Metric{ + // {Name: "flops", Value: types.Float(x1) + 1.}, + // {Name: "membw", Value: types.Float(x2) + 2.}, + // {Name: "ipc", Value: types.Float(x3) + 3.}, + // }) + + // ms.Write(sel3, int64(i)*2, []types.Metric{ + // {Name: "flops", Value: types.Float(x1) + 1.}, + // {Name: "membw", Value: types.Float(x2) + 2.}, + // {Name: "ipc", Value: types.Float(x3) + 3.}, + // }) + + // x1 += d1 + // x2 += d2 + // x3 += d3 + // } + + return ms +} + func TestIntEncoding(t *testing.T) { buf := make([]byte, 0, 100) x1 := uint64(0x0102030405060708) @@ -107,3 +161,45 @@ func TestIdentity(t *testing.T) { t.Fatal("x != deserialize(serialize(x))") } } + +func TestStreamingCheckpointIndentity(t *testing.T) { + disk := &bytes.Buffer{} + ms1 := createTestStore(t, true) + if err := ms1.SaveCheckpoint(0, 7000, disk); err != nil { + t.Fatal("saving checkpoint failed: ", err) + } + + // fmt.Printf("disk: %#v\n", disk.Bytes()) + + ms2 := createTestStore(t, false) + if err := ms2.LoadCheckpoint(disk); err != nil { + t.Fatal("loading checkpoint failed: ", err) + } + + arr1, from1, to1, err := ms1.Read(types.Selector{{String: "hello"}, {String: "world"}}, "flops", 0, 7000) + if err != nil { + t.Fatal(err) + } + + arr2, from2, to2, err := ms2.Read(types.Selector{{String: "hello"}, {String: "world"}}, "flops", 0, 7000) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(arr1, arr2) || from1 != from2 || to1 != to2 { + t.Fatal("x != deserialize(serialize(x))") + } + + // if !reflect.DeepEqual(ms1, ms2) { + // // fmt.Printf("ms1.root: %#v\n", ms1.root) + // // fmt.Printf("ms2.root: %#v\n", ms2.root) + // // fmt.Printf("ms1.root.sublevels['hello']: %#v\n", *ms1.root.sublevels["hello"]) + // // fmt.Printf("ms2.root.sublevels['hello']: %#v\n", *ms2.root.sublevels["hello"]) + // // fmt.Printf("ms1.root.sublevels['hello'].sublevels['world']: %#v\n", *ms1.root.sublevels["hello"].sublevels["world"]) + // // fmt.Printf("ms2.root.sublevels['hello'].sublevels['world']: %#v\n", *ms2.root.sublevels["hello"].sublevels["world"]) + // // fmt.Printf("ms1.root.sublevels['hello'].sublevels['world'].metrics[0]: %#v\n", *ms1.root.sublevels["hello"].sublevels["world"].metrics[0]) + // // fmt.Printf("ms2.root.sublevels['hello'].sublevels['world'].metrics[0]: %#v\n", *ms2.root.sublevels["hello"].sublevels["world"].metrics[0]) + + // t.Fatal("x != deserialize(serialize(x))") + // } +} diff --git a/internal/memstore/memstore.go b/internal/memstore/memstore.go index a87147c..bebe595 100644 --- a/internal/memstore/memstore.go +++ b/internal/memstore/memstore.go @@ -2,6 +2,7 @@ package memstore import ( "errors" + "math" "sync" "github.com/ClusterCockpit/cc-metric-store/internal/types" @@ -148,11 +149,22 @@ func (ms *MemoryStore) GetMetricConf(metric string) (types.MetricConfig, bool) { } func (ms *MemoryStore) GetMetricForOffset(offset int) string { - return "" // TODO! + for name, mc := range ms.metrics { + if mc.Offset == offset { + return name + } + } + return "" } func (ms *MemoryStore) MinFrequency() int64 { - return 10 // TODO + min := int64(math.MaxInt64) + for _, mc := range ms.metrics { + if mc.Frequency < min { + min = mc.Frequency + } + } + return min } func (m *MemoryStore) GetLevel(selector []string) *Level { @@ -189,6 +201,35 @@ func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metric return nil } +func (m *MemoryStore) Write(selector []string, ts int64, metrics []types.Metric) error { + l := m.root.findLevelOrCreate(selector, len(m.metrics)) + for _, metric := range metrics { + mc, ok := m.GetMetricConf(metric.Name) + if !ok { + continue + } + + c := l.metrics[mc.Offset] + if c == nil { + // First write to this metric and level + c = newChunk(ts, mc.Frequency) + l.metrics[mc.Offset] = c + } + + nc, err := c.write(ts, metric.Value) + if err != nil { + return err + } + + // Last write started a new chunk... + if c != nc { + l.metrics[mc.Offset] = nc + } + } + + return nil +} + func (m *MemoryStore) Free(t int64) int { _, n := m.root.free(t) return n diff --git a/internal/memstore/streaming-checkpoint.go b/internal/memstore/streaming-checkpoint.go index f391c19..820ecd0 100644 --- a/internal/memstore/streaming-checkpoint.go +++ b/internal/memstore/streaming-checkpoint.go @@ -31,6 +31,7 @@ func (ms *MemoryStore) SaveCheckpoint(from, to int64, w io.Writer) error { return nil } +// Writes a checkpoint for the current level to buf, buf to w if enough bytes are reached, and returns buf. func (l *Level) saveCheckpoint(ms *MemoryStore, from, to int64, w io.Writer, buf []byte, metricsbuf []types.Float) ([]byte, error) { var err error l.lock.RLock() @@ -39,26 +40,37 @@ func (l *Level) saveCheckpoint(ms *MemoryStore, from, to int64, w io.Writer, buf buf = encodeBytes(buf, nil) // Reserved // Metrics: - buf = encodeUint32(buf, uint32(len(l.metrics))) + n := 0 + for _, c := range l.metrics { + if c != nil { + n += 1 + } + } + + buf = encodeUint32(buf, uint32(n)) for i, c := range l.metrics { + if c == nil { + continue + } + key := ms.GetMetricForOffset(i) buf = encodeString(buf, key) // Metric buf = encodeBytes(buf, nil) // Reserved - metricsbuf = metricsbuf[:(to-from)/c.frequency+1] + metrics := metricsbuf[:(to-from)/c.frequency+1] var cfrom int64 - if metricsbuf, cfrom, _, err = c.read(from, to, metricsbuf); err != nil { + if metrics, cfrom, _, err = c.read(from, to, metrics); err != nil { return nil, err } buf = encodeUint64(buf, uint64(c.frequency)) buf = encodeUint64(buf, uint64(cfrom)) - buf = encodeUint32(buf, uint32(len(metricsbuf))) + buf = encodeUint32(buf, uint32(len(metrics))) var x types.Float elmsize := unsafe.Sizeof(x) - sh := (*reflect.SliceHeader)(unsafe.Pointer(&metricsbuf)) + sh := (*reflect.SliceHeader)(unsafe.Pointer(&metrics)) bytes := unsafe.Slice((*byte)(unsafe.Pointer(sh.Data)), sh.Len*int(elmsize)) buf = append(buf, bytes...) @@ -69,6 +81,13 @@ func (l *Level) saveCheckpoint(ms *MemoryStore, from, to int64, w io.Writer, buf buf = buf[0:] } + + for c != nil { + if c.end() <= to { + c.checkpointed = true + } + c = c.prev + } } // Sublevels: @@ -127,13 +146,13 @@ func (l *Level) loadCheckpoint(ms *MemoryStore, r io.Reader, buf []byte) error { if n, err = decodeUint32(buf, r); err != nil { return err } + if l.metrics == nil { + l.metrics = make([]*chunk, len(ms.metrics)) + } for i := 0; i < int(n); i++ { if key, err = decodeString(buf, r); err != nil { return err } - if l.metrics == nil { - l.metrics = make([]*chunk, len(ms.metrics)) - } // Metric: if _, err = decodeBytes(buf, r); err != nil { // Reserved... @@ -150,6 +169,9 @@ func (l *Level) loadCheckpoint(ms *MemoryStore, r io.Reader, buf []byte) error { if err != nil { return err } + if numelements == 0 { + continue + } var x types.Float elmsize := unsafe.Sizeof(x)