diff --git a/internal/memstore/checkpoints.go b/internal/memstore/checkpoints.go index eb79d75..64bfb91 100644 --- a/internal/memstore/checkpoints.go +++ b/internal/memstore/checkpoints.go @@ -250,7 +250,7 @@ func encodeUint32(buf []byte, i uint32) []byte { byte((i>>24)&0xff)) } -func decodeBytes(buf []byte, r *bufio.Reader) ([]byte, error) { +func decodeBytes(buf []byte, r io.Reader) ([]byte, error) { len, err := decodeUint32(buf, r) if err != nil { return nil, err @@ -267,7 +267,7 @@ func decodeBytes(buf []byte, r *bufio.Reader) ([]byte, error) { return bytes, nil } -func decodeString(buf []byte, r *bufio.Reader) (string, error) { +func decodeString(buf []byte, r io.Reader) (string, error) { len, err := decodeUint32(buf, r) if err != nil { return "", err @@ -286,7 +286,7 @@ func decodeString(buf []byte, r *bufio.Reader) (string, error) { return string(bytes), nil } -func decodeUint64(buf []byte, r *bufio.Reader) (uint64, error) { +func decodeUint64(buf []byte, r io.Reader) (uint64, error) { buf = buf[0:8] _, err := io.ReadFull(r, buf) if err != nil { @@ -303,7 +303,7 @@ func decodeUint64(buf []byte, r *bufio.Reader) (uint64, error) { (uint64(buf[7]) << 56), nil } -func decodeUint32(buf []byte, r *bufio.Reader) (uint32, error) { +func decodeUint32(buf []byte, r io.Reader) (uint32, error) { buf = buf[0:4] _, err := io.ReadFull(r, buf) if err != nil { diff --git a/internal/memstore/chunks.go b/internal/memstore/chunks.go index 54449ca..a0ce97a 100644 --- a/internal/memstore/chunks.go +++ b/internal/memstore/chunks.go @@ -16,15 +16,14 @@ type chunk struct { } func newChunk(ts, freq int64) *chunk { - b := &chunk{} - b.frequency = freq - b.start = ts - (freq / 2) - b.prev = nil - b.next = nil - b.checkpointed = false - b.data = RequestFloatSlice(bufferSizeInFloats) - b.data = b.data[:0] - return b + return &chunk{ + frequency: freq, + start: ts - (freq / 2), + prev: nil, + next: nil, + checkpointed: false, + data: RequestFloatSlice(bufferSizeInFloats)[:0], + } } func freeChunk(c *chunk) { diff --git a/internal/memstore/memstore.go b/internal/memstore/memstore.go index f4d2429..0171e92 100644 --- a/internal/memstore/memstore.go +++ b/internal/memstore/memstore.go @@ -91,6 +91,8 @@ func (l *level) free(t int64) (delme bool, n int) { type MemoryStore struct { root level // root of the tree structure // TODO... + + metrics map[string]int // TODO... } func (ms *MemoryStore) GetOffset(metric string) int { @@ -100,3 +102,7 @@ func (ms *MemoryStore) GetOffset(metric string) int { func (ms *MemoryStore) GetMetricForOffset(offset int) string { return "" // TODO! } + +func (ms *MemoryStore) MinFrequency() int64 { + return 10 // TODO +} diff --git a/internal/memstore/streaming-checkpoint.go b/internal/memstore/streaming-checkpoint.go index 6ee2f7f..2895f69 100644 --- a/internal/memstore/streaming-checkpoint.go +++ b/internal/memstore/streaming-checkpoint.go @@ -1,6 +1,7 @@ package memstore import ( + "fmt" "io" "reflect" "unsafe" @@ -8,7 +9,29 @@ import ( "github.com/ClusterCockpit/cc-metric-store/internal/types" ) -func (l *level) streamingCheckpoint(ms *MemoryStore, from, to int64, w io.Writer, buf []byte, metricsbuf []types.Float) ([]byte, error) { +// Can be done in parallel with other operations, but is single threaded itself. +func (ms *MemoryStore) SaveCheckpoint(from, to int64, w io.Writer) error { + // Header: + buf := make([]byte, 0, writeBufferSize*2) + buf = append(buf, magicValue...) + buf = encodeBytes(buf, nil) + buf = encodeUint64(buf, uint64(from)) + buf = encodeUint64(buf, uint64(to)) + + metricsbuf := make([]types.Float, 0, (to-from)/ms.MinFrequency()+1) + var err error + if buf, err = ms.root.saveCheckpoint(ms, from, to, w, buf, metricsbuf); err != nil { + return err + } + + if _, err := w.Write(buf); err != nil { + return err + } + + return nil +} + +func (l *level) saveCheckpoint(ms *MemoryStore, from, to int64, w io.Writer, buf []byte, metricsbuf []types.Float) ([]byte, error) { var err error l.lock.RLock() defer l.lock.RUnlock() @@ -52,7 +75,7 @@ func (l *level) streamingCheckpoint(ms *MemoryStore, from, to int64, w io.Writer buf = encodeUint32(buf, uint32(len(l.sublevels))) for key, sublevel := range l.sublevels { buf = encodeString(buf, key) - buf, err = sublevel.streamingCheckpoint(ms, from, to, w, buf, metricsbuf) + buf, err = sublevel.saveCheckpoint(ms, from, to, w, buf, metricsbuf) if err != nil { return nil, err } @@ -60,3 +83,136 @@ func (l *level) streamingCheckpoint(ms *MemoryStore, from, to int64, w io.Writer return buf, nil } + +func (ms *MemoryStore) LoadCheckpoint(r io.Reader) error { + buf := make([]byte, len(magicValue), 64) + if _, err := io.ReadFull(r, buf); err != nil { + return err + } + + if string(buf) != magicValue { + return fmt.Errorf("file corrupted: expected the file to start with %#v (got %#v)", magicValue, string(buf)) + } + + if _, err := decodeBytes(buf, r); err != nil { // Reserved + return err + } + if _, err := decodeUint64(buf, r); err != nil { // From + return err + } + if _, err := decodeUint64(buf, r); err != nil { // To + return err + } + + if err := ms.root.loadCheckpoint(ms, r, buf); err != nil { + return err + } + + return nil +} + +// Blocks all other accesses for this level and all its sublevels! +func (l *level) loadCheckpoint(ms *MemoryStore, r io.Reader, buf []byte) error { + l.lock.Lock() + defer l.lock.Unlock() + + var n uint32 + var err error + var key string + if _, err = decodeBytes(buf, r); err != nil { // Reserved... + return err + } + + // Metrics: + if n, err = decodeUint32(buf, r); err != nil { + return err + } + for i := 0; i < int(n); i++ { + if key, err = decodeString(buf, r); err != nil { + return err + } + if l.metrics == nil { + l.metrics = make([]*chunk, len(ms.metrics)) + } + + // Metric: + if _, err = decodeBytes(buf, r); err != nil { // Reserved... + return err + } + var freq, from uint64 + if freq, err = decodeUint64(buf, r); err != nil { + return err + } + if from, err = decodeUint64(buf, r); err != nil { + return err + } + numelements, err := decodeUint32(buf, r) + if err != nil { + return err + } + + var x types.Float + elmsize := unsafe.Sizeof(x) + bytes := RequestBytes(int(elmsize) * int(numelements)) + if _, err = io.ReadFull(r, bytes); err != nil { + return fmt.Errorf("loading metric %#v: %w", key, err) + } + + offset := ms.GetOffset(key) + if offset == -1 { + // Skip unkown metrics + ReleaseBytes(bytes) + continue + } + + sh := (*reflect.SliceHeader)(unsafe.Pointer(&bytes)) + chunk := &chunk{ + frequency: int64(freq), + start: int64(from), + prev: nil, + next: nil, + data: unsafe.Slice((*types.Float)(unsafe.Pointer(sh.Data)), numelements), + checkpointed: true, + } + + if prevchunk := l.metrics[offset]; prevchunk != nil { + if prevchunk.end() > chunk.start { + return fmt.Errorf( + "loading metric %#v: loaded checkpoint overlaps with other chunks or is not loaded in correct order (%d - %d)", + key, prevchunk.start, chunk.start) + } + prevchunk.next = chunk + chunk.prev = prevchunk + l.metrics[offset] = chunk + } else { + l.metrics[offset] = chunk + } + } + + // Sublevels: + if n, err = decodeUint32(buf, r); err != nil { + return err + } + for i := 0; i < int(n); i++ { + if key, err = decodeString(buf, r); err != nil { + return err + } + if l.sublevels == nil { + l.sublevels = make(map[string]*level, n) + } + sublevel, ok := l.sublevels[key] + if !ok { + sublevel = &level{} + } + + if err = sublevel.loadCheckpoint(ms, r, buf); err != nil { + return fmt.Errorf("loading sublevel %#v: %w", key, err) + } + + if !ok { + l.sublevels[key] = sublevel + } + } + + return nil +} diff --git a/internal/types/types.go b/internal/types/types.go new file mode 100644 index 0000000..efa9ce1 --- /dev/null +++ b/internal/types/types.go @@ -0,0 +1,8 @@ +package types + +type Stats struct { + Samples int `json:"samples"` + Min Float `json:"min"` + Avg Float `json:"avg"` + Max Float `json:"max"` +}