test streaming checkpoints

This commit is contained in:
Lou Knauer 2022-07-25 14:04:26 +02:00
parent 1125678bc0
commit a8bc250600
3 changed files with 169 additions and 10 deletions

View File

@ -5,12 +5,66 @@ import (
"bytes"
"encoding/json"
"log"
"math"
"reflect"
"testing"
"github.com/ClusterCockpit/cc-metric-store/internal/types"
)
func createTestStore(t *testing.T, withData bool) *MemoryStore {
ms := NewMemoryStore(map[string]types.MetricConfig{
"flops": {Frequency: 1},
"membw": {Frequency: 1},
"ipc": {Frequency: 2},
})
if !withData {
return ms
}
n := 1000
sel := []string{"hello", "world"}
for i := 0; i < n; i++ {
if err := ms.Write(sel, int64(i), []types.Metric{
{Name: "flops", Value: types.Float(math.Sin(float64(i) * 0.1))},
}); err != nil {
t.Fatal(err)
}
}
// n := 3000
// x1, x2, x3 := 0.0, 1.1, 2.2
// d1, d2, d3 := 0.05, 0.1, 0.2
// sel1, sel2, sel3 := []string{"cluster"}, []string{"cluster", "host1"}, []string{"cluster", "host2", "cpu0"}
// for i := 0; i < n; i++ {
// ms.Write(sel1, int64(i), []types.Metric{
// {Name: "flops", Value: types.Float(x1)},
// {Name: "membw", Value: types.Float(x2)},
// {Name: "ipc", Value: types.Float(x3)},
// })
// ms.Write(sel2, int64(i), []types.Metric{
// {Name: "flops", Value: types.Float(x1) + 1.},
// {Name: "membw", Value: types.Float(x2) + 2.},
// {Name: "ipc", Value: types.Float(x3) + 3.},
// })
// ms.Write(sel3, int64(i)*2, []types.Metric{
// {Name: "flops", Value: types.Float(x1) + 1.},
// {Name: "membw", Value: types.Float(x2) + 2.},
// {Name: "ipc", Value: types.Float(x3) + 3.},
// })
// x1 += d1
// x2 += d2
// x3 += d3
// }
return ms
}
func TestIntEncoding(t *testing.T) {
buf := make([]byte, 0, 100)
x1 := uint64(0x0102030405060708)
@ -107,3 +161,45 @@ func TestIdentity(t *testing.T) {
t.Fatal("x != deserialize(serialize(x))")
}
}
func TestStreamingCheckpointIndentity(t *testing.T) {
disk := &bytes.Buffer{}
ms1 := createTestStore(t, true)
if err := ms1.SaveCheckpoint(0, 7000, disk); err != nil {
t.Fatal("saving checkpoint failed: ", err)
}
// fmt.Printf("disk: %#v\n", disk.Bytes())
ms2 := createTestStore(t, false)
if err := ms2.LoadCheckpoint(disk); err != nil {
t.Fatal("loading checkpoint failed: ", err)
}
arr1, from1, to1, err := ms1.Read(types.Selector{{String: "hello"}, {String: "world"}}, "flops", 0, 7000)
if err != nil {
t.Fatal(err)
}
arr2, from2, to2, err := ms2.Read(types.Selector{{String: "hello"}, {String: "world"}}, "flops", 0, 7000)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(arr1, arr2) || from1 != from2 || to1 != to2 {
t.Fatal("x != deserialize(serialize(x))")
}
// if !reflect.DeepEqual(ms1, ms2) {
// // fmt.Printf("ms1.root: %#v\n", ms1.root)
// // fmt.Printf("ms2.root: %#v\n", ms2.root)
// // fmt.Printf("ms1.root.sublevels['hello']: %#v\n", *ms1.root.sublevels["hello"])
// // fmt.Printf("ms2.root.sublevels['hello']: %#v\n", *ms2.root.sublevels["hello"])
// // fmt.Printf("ms1.root.sublevels['hello'].sublevels['world']: %#v\n", *ms1.root.sublevels["hello"].sublevels["world"])
// // fmt.Printf("ms2.root.sublevels['hello'].sublevels['world']: %#v\n", *ms2.root.sublevels["hello"].sublevels["world"])
// // fmt.Printf("ms1.root.sublevels['hello'].sublevels['world'].metrics[0]: %#v\n", *ms1.root.sublevels["hello"].sublevels["world"].metrics[0])
// // fmt.Printf("ms2.root.sublevels['hello'].sublevels['world'].metrics[0]: %#v\n", *ms2.root.sublevels["hello"].sublevels["world"].metrics[0])
// t.Fatal("x != deserialize(serialize(x))")
// }
}

View File

@ -2,6 +2,7 @@ package memstore
import (
"errors"
"math"
"sync"
"github.com/ClusterCockpit/cc-metric-store/internal/types"
@ -148,11 +149,22 @@ func (ms *MemoryStore) GetMetricConf(metric string) (types.MetricConfig, bool) {
}
func (ms *MemoryStore) GetMetricForOffset(offset int) string {
return "" // TODO!
for name, mc := range ms.metrics {
if mc.Offset == offset {
return name
}
}
return ""
}
func (ms *MemoryStore) MinFrequency() int64 {
return 10 // TODO
min := int64(math.MaxInt64)
for _, mc := range ms.metrics {
if mc.Frequency < min {
min = mc.Frequency
}
}
return min
}
func (m *MemoryStore) GetLevel(selector []string) *Level {
@ -189,6 +201,35 @@ func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metric
return nil
}
func (m *MemoryStore) Write(selector []string, ts int64, metrics []types.Metric) error {
l := m.root.findLevelOrCreate(selector, len(m.metrics))
for _, metric := range metrics {
mc, ok := m.GetMetricConf(metric.Name)
if !ok {
continue
}
c := l.metrics[mc.Offset]
if c == nil {
// First write to this metric and level
c = newChunk(ts, mc.Frequency)
l.metrics[mc.Offset] = c
}
nc, err := c.write(ts, metric.Value)
if err != nil {
return err
}
// Last write started a new chunk...
if c != nc {
l.metrics[mc.Offset] = nc
}
}
return nil
}
func (m *MemoryStore) Free(t int64) int {
_, n := m.root.free(t)
return n

View File

@ -31,6 +31,7 @@ func (ms *MemoryStore) SaveCheckpoint(from, to int64, w io.Writer) error {
return nil
}
// Writes a checkpoint for the current level to buf, buf to w if enough bytes are reached, and returns buf.
func (l *Level) saveCheckpoint(ms *MemoryStore, from, to int64, w io.Writer, buf []byte, metricsbuf []types.Float) ([]byte, error) {
var err error
l.lock.RLock()
@ -39,26 +40,37 @@ func (l *Level) saveCheckpoint(ms *MemoryStore, from, to int64, w io.Writer, buf
buf = encodeBytes(buf, nil) // Reserved
// Metrics:
buf = encodeUint32(buf, uint32(len(l.metrics)))
n := 0
for _, c := range l.metrics {
if c != nil {
n += 1
}
}
buf = encodeUint32(buf, uint32(n))
for i, c := range l.metrics {
if c == nil {
continue
}
key := ms.GetMetricForOffset(i)
buf = encodeString(buf, key)
// Metric
buf = encodeBytes(buf, nil) // Reserved
metricsbuf = metricsbuf[:(to-from)/c.frequency+1]
metrics := metricsbuf[:(to-from)/c.frequency+1]
var cfrom int64
if metricsbuf, cfrom, _, err = c.read(from, to, metricsbuf); err != nil {
if metrics, cfrom, _, err = c.read(from, to, metrics); err != nil {
return nil, err
}
buf = encodeUint64(buf, uint64(c.frequency))
buf = encodeUint64(buf, uint64(cfrom))
buf = encodeUint32(buf, uint32(len(metricsbuf)))
buf = encodeUint32(buf, uint32(len(metrics)))
var x types.Float
elmsize := unsafe.Sizeof(x)
sh := (*reflect.SliceHeader)(unsafe.Pointer(&metricsbuf))
sh := (*reflect.SliceHeader)(unsafe.Pointer(&metrics))
bytes := unsafe.Slice((*byte)(unsafe.Pointer(sh.Data)), sh.Len*int(elmsize))
buf = append(buf, bytes...)
@ -69,6 +81,13 @@ func (l *Level) saveCheckpoint(ms *MemoryStore, from, to int64, w io.Writer, buf
buf = buf[0:]
}
for c != nil {
if c.end() <= to {
c.checkpointed = true
}
c = c.prev
}
}
// Sublevels:
@ -127,13 +146,13 @@ func (l *Level) loadCheckpoint(ms *MemoryStore, r io.Reader, buf []byte) error {
if n, err = decodeUint32(buf, r); err != nil {
return err
}
if l.metrics == nil {
l.metrics = make([]*chunk, len(ms.metrics))
}
for i := 0; i < int(n); i++ {
if key, err = decodeString(buf, r); err != nil {
return err
}
if l.metrics == nil {
l.metrics = make([]*chunk, len(ms.metrics))
}
// Metric:
if _, err = decodeBytes(buf, r); err != nil { // Reserved...
@ -150,6 +169,9 @@ func (l *Level) loadCheckpoint(ms *MemoryStore, r io.Reader, buf []byte) error {
if err != nil {
return err
}
if numelements == 0 {
continue
}
var x types.Float
elmsize := unsafe.Sizeof(x)