mirror of
https://github.com/ClusterCockpit/cc-metric-store.git
synced 2024-11-10 05:07:25 +01:00
streaming checkpoints
This commit is contained in:
parent
f153207346
commit
ef6e09c3e2
@ -250,7 +250,7 @@ func encodeUint32(buf []byte, i uint32) []byte {
|
||||
byte((i>>24)&0xff))
|
||||
}
|
||||
|
||||
func decodeBytes(buf []byte, r *bufio.Reader) ([]byte, error) {
|
||||
func decodeBytes(buf []byte, r io.Reader) ([]byte, error) {
|
||||
len, err := decodeUint32(buf, r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -267,7 +267,7 @@ func decodeBytes(buf []byte, r *bufio.Reader) ([]byte, error) {
|
||||
return bytes, nil
|
||||
}
|
||||
|
||||
func decodeString(buf []byte, r *bufio.Reader) (string, error) {
|
||||
func decodeString(buf []byte, r io.Reader) (string, error) {
|
||||
len, err := decodeUint32(buf, r)
|
||||
if err != nil {
|
||||
return "", err
|
||||
@ -286,7 +286,7 @@ func decodeString(buf []byte, r *bufio.Reader) (string, error) {
|
||||
return string(bytes), nil
|
||||
}
|
||||
|
||||
func decodeUint64(buf []byte, r *bufio.Reader) (uint64, error) {
|
||||
func decodeUint64(buf []byte, r io.Reader) (uint64, error) {
|
||||
buf = buf[0:8]
|
||||
_, err := io.ReadFull(r, buf)
|
||||
if err != nil {
|
||||
@ -303,7 +303,7 @@ func decodeUint64(buf []byte, r *bufio.Reader) (uint64, error) {
|
||||
(uint64(buf[7]) << 56), nil
|
||||
}
|
||||
|
||||
func decodeUint32(buf []byte, r *bufio.Reader) (uint32, error) {
|
||||
func decodeUint32(buf []byte, r io.Reader) (uint32, error) {
|
||||
buf = buf[0:4]
|
||||
_, err := io.ReadFull(r, buf)
|
||||
if err != nil {
|
||||
|
@ -16,15 +16,14 @@ type chunk struct {
|
||||
}
|
||||
|
||||
func newChunk(ts, freq int64) *chunk {
|
||||
b := &chunk{}
|
||||
b.frequency = freq
|
||||
b.start = ts - (freq / 2)
|
||||
b.prev = nil
|
||||
b.next = nil
|
||||
b.checkpointed = false
|
||||
b.data = RequestFloatSlice(bufferSizeInFloats)
|
||||
b.data = b.data[:0]
|
||||
return b
|
||||
return &chunk{
|
||||
frequency: freq,
|
||||
start: ts - (freq / 2),
|
||||
prev: nil,
|
||||
next: nil,
|
||||
checkpointed: false,
|
||||
data: RequestFloatSlice(bufferSizeInFloats)[:0],
|
||||
}
|
||||
}
|
||||
|
||||
func freeChunk(c *chunk) {
|
||||
|
@ -91,6 +91,8 @@ func (l *level) free(t int64) (delme bool, n int) {
|
||||
type MemoryStore struct {
|
||||
root level // root of the tree structure
|
||||
// TODO...
|
||||
|
||||
metrics map[string]int // TODO...
|
||||
}
|
||||
|
||||
func (ms *MemoryStore) GetOffset(metric string) int {
|
||||
@ -100,3 +102,7 @@ func (ms *MemoryStore) GetOffset(metric string) int {
|
||||
func (ms *MemoryStore) GetMetricForOffset(offset int) string {
|
||||
return "" // TODO!
|
||||
}
|
||||
|
||||
func (ms *MemoryStore) MinFrequency() int64 {
|
||||
return 10 // TODO
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package memstore
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"reflect"
|
||||
"unsafe"
|
||||
@ -8,7 +9,29 @@ import (
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/types"
|
||||
)
|
||||
|
||||
func (l *level) streamingCheckpoint(ms *MemoryStore, from, to int64, w io.Writer, buf []byte, metricsbuf []types.Float) ([]byte, error) {
|
||||
// Can be done in parallel with other operations, but is single threaded itself.
|
||||
func (ms *MemoryStore) SaveCheckpoint(from, to int64, w io.Writer) error {
|
||||
// Header:
|
||||
buf := make([]byte, 0, writeBufferSize*2)
|
||||
buf = append(buf, magicValue...)
|
||||
buf = encodeBytes(buf, nil)
|
||||
buf = encodeUint64(buf, uint64(from))
|
||||
buf = encodeUint64(buf, uint64(to))
|
||||
|
||||
metricsbuf := make([]types.Float, 0, (to-from)/ms.MinFrequency()+1)
|
||||
var err error
|
||||
if buf, err = ms.root.saveCheckpoint(ms, from, to, w, buf, metricsbuf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := w.Write(buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *level) saveCheckpoint(ms *MemoryStore, from, to int64, w io.Writer, buf []byte, metricsbuf []types.Float) ([]byte, error) {
|
||||
var err error
|
||||
l.lock.RLock()
|
||||
defer l.lock.RUnlock()
|
||||
@ -52,7 +75,7 @@ func (l *level) streamingCheckpoint(ms *MemoryStore, from, to int64, w io.Writer
|
||||
buf = encodeUint32(buf, uint32(len(l.sublevels)))
|
||||
for key, sublevel := range l.sublevels {
|
||||
buf = encodeString(buf, key)
|
||||
buf, err = sublevel.streamingCheckpoint(ms, from, to, w, buf, metricsbuf)
|
||||
buf, err = sublevel.saveCheckpoint(ms, from, to, w, buf, metricsbuf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -60,3 +83,136 @@ func (l *level) streamingCheckpoint(ms *MemoryStore, from, to int64, w io.Writer
|
||||
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
func (ms *MemoryStore) LoadCheckpoint(r io.Reader) error {
|
||||
buf := make([]byte, len(magicValue), 64)
|
||||
if _, err := io.ReadFull(r, buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if string(buf) != magicValue {
|
||||
return fmt.Errorf("file corrupted: expected the file to start with %#v (got %#v)", magicValue, string(buf))
|
||||
}
|
||||
|
||||
if _, err := decodeBytes(buf, r); err != nil { // Reserved
|
||||
return err
|
||||
}
|
||||
if _, err := decodeUint64(buf, r); err != nil { // From
|
||||
return err
|
||||
}
|
||||
if _, err := decodeUint64(buf, r); err != nil { // To
|
||||
return err
|
||||
}
|
||||
|
||||
if err := ms.root.loadCheckpoint(ms, r, buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Blocks all other accesses for this level and all its sublevels!
|
||||
func (l *level) loadCheckpoint(ms *MemoryStore, r io.Reader, buf []byte) error {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
|
||||
var n uint32
|
||||
var err error
|
||||
var key string
|
||||
if _, err = decodeBytes(buf, r); err != nil { // Reserved...
|
||||
return err
|
||||
}
|
||||
|
||||
// Metrics:
|
||||
if n, err = decodeUint32(buf, r); err != nil {
|
||||
return err
|
||||
}
|
||||
for i := 0; i < int(n); i++ {
|
||||
if key, err = decodeString(buf, r); err != nil {
|
||||
return err
|
||||
}
|
||||
if l.metrics == nil {
|
||||
l.metrics = make([]*chunk, len(ms.metrics))
|
||||
}
|
||||
|
||||
// Metric:
|
||||
if _, err = decodeBytes(buf, r); err != nil { // Reserved...
|
||||
return err
|
||||
}
|
||||
var freq, from uint64
|
||||
if freq, err = decodeUint64(buf, r); err != nil {
|
||||
return err
|
||||
}
|
||||
if from, err = decodeUint64(buf, r); err != nil {
|
||||
return err
|
||||
}
|
||||
numelements, err := decodeUint32(buf, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var x types.Float
|
||||
elmsize := unsafe.Sizeof(x)
|
||||
bytes := RequestBytes(int(elmsize) * int(numelements))
|
||||
if _, err = io.ReadFull(r, bytes); err != nil {
|
||||
return fmt.Errorf("loading metric %#v: %w", key, err)
|
||||
}
|
||||
|
||||
offset := ms.GetOffset(key)
|
||||
if offset == -1 {
|
||||
// Skip unkown metrics
|
||||
ReleaseBytes(bytes)
|
||||
continue
|
||||
}
|
||||
|
||||
sh := (*reflect.SliceHeader)(unsafe.Pointer(&bytes))
|
||||
chunk := &chunk{
|
||||
frequency: int64(freq),
|
||||
start: int64(from),
|
||||
prev: nil,
|
||||
next: nil,
|
||||
data: unsafe.Slice((*types.Float)(unsafe.Pointer(sh.Data)), numelements),
|
||||
checkpointed: true,
|
||||
}
|
||||
|
||||
if prevchunk := l.metrics[offset]; prevchunk != nil {
|
||||
if prevchunk.end() > chunk.start {
|
||||
return fmt.Errorf(
|
||||
"loading metric %#v: loaded checkpoint overlaps with other chunks or is not loaded in correct order (%d - %d)",
|
||||
key, prevchunk.start, chunk.start)
|
||||
}
|
||||
prevchunk.next = chunk
|
||||
chunk.prev = prevchunk
|
||||
l.metrics[offset] = chunk
|
||||
} else {
|
||||
l.metrics[offset] = chunk
|
||||
}
|
||||
}
|
||||
|
||||
// Sublevels:
|
||||
if n, err = decodeUint32(buf, r); err != nil {
|
||||
return err
|
||||
}
|
||||
for i := 0; i < int(n); i++ {
|
||||
if key, err = decodeString(buf, r); err != nil {
|
||||
return err
|
||||
}
|
||||
if l.sublevels == nil {
|
||||
l.sublevels = make(map[string]*level, n)
|
||||
}
|
||||
sublevel, ok := l.sublevels[key]
|
||||
if !ok {
|
||||
sublevel = &level{}
|
||||
}
|
||||
|
||||
if err = sublevel.loadCheckpoint(ms, r, buf); err != nil {
|
||||
return fmt.Errorf("loading sublevel %#v: %w", key, err)
|
||||
}
|
||||
|
||||
if !ok {
|
||||
l.sublevels[key] = sublevel
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
8
internal/types/types.go
Normal file
8
internal/types/types.go
Normal file
@ -0,0 +1,8 @@
|
||||
package types
|
||||
|
||||
type Stats struct {
|
||||
Samples int `json:"samples"`
|
||||
Min Float `json:"min"`
|
||||
Avg Float `json:"avg"`
|
||||
Max Float `json:"max"`
|
||||
}
|
Loading…
Reference in New Issue
Block a user