diff --git a/memoryStore.go b/memoryStore.go deleted file mode 100644 index dbc6f00..0000000 --- a/memoryStore.go +++ /dev/null @@ -1,250 +0,0 @@ -package main - -import ( - "fmt" - "math" - "strings" - "sync" - "time" - - "github.com/ClusterCockpit/cc-metric-store/lineprotocol" -) - -type storeBuffer struct { - store []lineprotocol.Float - start int64 -} - -type buffer struct { - current *storeBuffer - next *storeBuffer - lock sync.Mutex -} - -//MemoryStore holds the state for a metric memory store. -//It does not export any variable. -type MemoryStore struct { - containers map[string]*buffer - offsets map[string]int - frequency int - numSlots int - numMetrics int - lock sync.Mutex -} - -func initBuffer(b *storeBuffer) { - for i := 0; i < len(b.store); i++ { - b.store[i] = lineprotocol.Float(math.NaN()) - } -} - -func allocateBuffer(ts int64, size int) *buffer { - b := new(buffer) - s := make([]lineprotocol.Float, size) - b.current = &storeBuffer{s, ts} - initBuffer(b.current) - - s = make([]lineprotocol.Float, size) - b.next = &storeBuffer{s, 0} - initBuffer(b.next) - return b -} - -func switchBuffers(m *MemoryStore, b *buffer) { - initBuffer(b.next) - b.current, b.next = b.next, b.current - b.current.start = b.next.start + int64(m.numSlots*m.frequency) -} - -func newMemoryStore(o []string, n int, f int) *MemoryStore { - var m MemoryStore - m.frequency = f - m.numSlots = n - m.containers = make(map[string]*buffer) - m.offsets = make(map[string]int) - - for i, name := range o { - m.offsets[name] = i - } - - m.numMetrics = len(o) - - return &m -} - -// AddMetrics writes metrics to the memoryStore for entity key -// at unix epoch time ts. The unit of ts is seconds. -// An error is returned if ts is out of bounds of MemoryStore. -func (m *MemoryStore) AddMetrics( - key string, - ts int64, - metrics []lineprotocol.Metric) error { - - m.lock.Lock() - b, ok := m.containers[key] - if !ok { - //Key does not exist. Allocate new buffer. - m.containers[key] = allocateBuffer(ts, m.numMetrics*m.numSlots) - b = m.containers[key] - } - m.lock.Unlock() - b.lock.Lock() - defer b.lock.Unlock() - - index := int(ts-b.current.start) / m.frequency - - if index < 0 || index >= 2*m.numSlots { - return fmt.Errorf("ts %d out of bounds", ts) - } - - if index >= m.numSlots { - //Index exceeds buffer length. Switch buffers. - switchBuffers(m, b) - index = int(ts-b.current.start) / m.frequency - } - - s := b.current.store - - for _, metric := range metrics { - s[m.offsets[metric.Name]*m.numSlots+index] = metric.Value - } - - return nil -} - -// GetMetric returns a slize with metric values for timerange -// and entity key. Returns an error if key does not exist, -// stop is before start or start is in the future. -func (m *MemoryStore) GetMetric( - key string, - metric string, - from int64, - to int64) ([]lineprotocol.Float, int64, error) { - - m.lock.Lock() - b, ok := m.containers[key] - m.lock.Unlock() - if !ok { - return nil, 0, fmt.Errorf("key %s does not exist", key) - } - - b.lock.Lock() - defer b.lock.Unlock() - - if to <= from { - return nil, 0, fmt.Errorf("invalid duration %d - %d", from, to) - } - - if from > b.current.start+int64(m.numSlots*m.frequency) { - return nil, 0, fmt.Errorf("from %d out of bounds", from) - } - - if to < b.next.start { - return nil, 0, fmt.Errorf("to %d out of bounds", to) - } - - var values1, values2 []lineprotocol.Float - offset := m.offsets[metric] * m.numSlots - valuesFrom := from - - if from < b.current.start && b.next.start != 0 { - - var start, stop = 0, m.numSlots - - if from > b.next.start { - start = int(from-b.next.start) / m.frequency - } else { - valuesFrom = b.next.start - } - - if to < b.current.start { - stop = int(to-b.next.start) / m.frequency - } - - // fmt.Println("NEXT", start, stop) - values1 = b.next.store[offset+start : offset+stop] - } - - if to >= b.current.start { - - var start, stop = 0, m.numSlots - - if from > b.current.start { - start = int(from-b.current.start) / m.frequency - } - - if to <= b.current.start+int64(m.numSlots*m.frequency) { - stop = int(to-b.current.start) / m.frequency - } - - // fmt.Println("CURRENT", start, stop, b.current.start) - values2 = b.current.store[offset+start : offset+stop] - } - - return append(values1, values2...), valuesFrom, nil -} - -// Call *f* once on every value which *GetMetric* would -// return for similar arguments. This operation might be known -// as fold in Ruby/Haskell/Scala. It can be used to implement -// the calculation of sums, averages, minimas and maximas. -// The advantage of using this over *GetMetric* for such calculations -// is that it can be implemented without copying data. -// TODO: Write Tests, implement without calling GetMetric! -func (m *MemoryStore) Reduce( - key string, metric string, - from int64, to int64, - f func(t int64, acc lineprotocol.Float, x lineprotocol.Float) lineprotocol.Float, initialX lineprotocol.Float) (lineprotocol.Float, error) { - - values, valuesFrom, err := m.GetMetric(key, metric, from, to) - if err != nil { - return 0.0, err - } - - acc := initialX - t := valuesFrom - for i := 0; i < len(values); i++ { - acc = f(t, acc, values[i]) - t += int64(m.frequency) - } - - return acc, nil -} - -// Return a map of keys to a map of metrics to the most recent value writen to -// the store for that metric. -// TODO: Write Tests! -func (m *MemoryStore) Peak(prefix string) map[string]map[string]lineprotocol.Float { - m.lock.Lock() - defer m.lock.Unlock() - - now := time.Now().Unix() - - retval := make(map[string]map[string]lineprotocol.Float) - for key, b := range m.containers { - if !strings.HasPrefix(key, prefix) { - continue - } - - b.lock.Lock() - index := int(now-b.current.start) / m.frequency - if index >= m.numSlots { - index = m.numSlots - 1 - } - - vals := make(map[string]lineprotocol.Float) - for metric, offset := range m.offsets { - val := lineprotocol.Float(math.NaN()) - for i := index; i >= 0 && math.IsNaN(float64(val)); i -= 1 { - val = b.current.store[offset*m.numSlots+i] - } - - vals[metric] = val - } - - b.lock.Unlock() - retval[key[len(prefix):]] = vals - } - - return retval -} diff --git a/memoryStore_test.go b/memoryStore_test.go.orig similarity index 100% rename from memoryStore_test.go rename to memoryStore_test.go.orig diff --git a/memstore.go b/memstore.go new file mode 100644 index 0000000..7ce88eb --- /dev/null +++ b/memstore.go @@ -0,0 +1,275 @@ +package main + +import ( + "errors" + "sync" +) + +// Default buffer capacity. +// `buffer.data` will only ever grow up to it's capacity and a new link +// in the buffer chain will be created if needed so that no copying +// of needs to happen on writes. +const ( + BUFFER_CAP int = 1024 +) + +// So that we can reuse allocations +var bufferPool sync.Pool = sync.Pool{ + New: func() interface{} { + return make([]Float, 0, BUFFER_CAP) + }, +} + +// Each metric on each level has it's own buffer. +// This is where the actual values go. +type buffer struct { + frequency int64 // Time between two "slots" + start int64 // Timestamp of when `data[0]` was written. + data []Float // The slice should never reallocacte as `cap(data)` is respected. + prev, next *buffer // `prev` contains older data, `next` newer data. +} + +func newBuffer(ts, freq int64) *buffer { + return &buffer{ + frequency: freq, + start: ts, + data: bufferPool.Get().([]Float)[:0], + prev: nil, + next: nil, + } +} + +// If a new buffer was created, the new head is returnd. +// Otherwise, the existing buffer is returnd. +func (b *buffer) write(ts int64, value Float) (*buffer, error) { + if ts < b.start { + return nil, errors.New("cannot write value to buffer from past") + } + + idx := int((ts - b.start) / b.frequency) + if idx >= cap(b.data) { + newbuf := newBuffer(ts, b.frequency) + newbuf.prev = b + b.next = newbuf + b = newbuf + idx = 0 + } + + // Overwriting value or writing value from past + if idx < len(b.data) { + b.data[idx] = value + return b, nil + } + + // Fill up unwritten slots with NaN + for i := len(b.data); i < idx; i++ { + b.data = append(b.data, NaN) + } + + b.data = append(b.data, value) + return b, nil +} + +// Return all known values from `from` to `to`. Gaps of information are +// represented by NaN. If values at the start or end are missing, +// instead of NaN values, the second and thrid return values contain +// the actual `from`/`to`. +func (b *buffer) read(from, to int64) ([]Float, int64, int64, error) { + if from < b.start { + if b.prev != nil { + return b.prev.read(from, to) + } + from = b.start + } + + data := make([]Float, 0, (to-from)/b.frequency+1) + + var t int64 + for t = from; t < to; t += b.frequency { + idx := int((t - b.start) / b.frequency) + if idx >= cap(b.data) { + b = b.next + if b == nil { + return data, from, t, nil + } + idx = 0 + } + + if t < b.start || idx >= len(b.data) { + data = append(data, NaN) + } else { + data = append(data, b.data[idx]) + } + } + + return data, from, t, nil +} + +// Could also be called "node" as this forms a node in a tree structure. +// Called level because "node" might be confusing here. +// Can be both a leaf or a inner node. In this structue, inner nodes can +// also hold data (in `metrics`). +type level struct { + lock sync.Mutex // There is performance to be gained by having different locks for `metrics` and `children` (Spinlock?). + metrics map[string]*buffer // Every level can store metrics. + children map[string]*level // Sub-granularities/nodes. Use `sync.Map`? +} + +// Caution: the lock of the returned level will be LOCKED. +// Find the correct level for the given selector, creating it if +// it does not exist. Example selector in the context of the +// ClusterCockpit could be: []string{ "emmy", "host123", "cpu", "0" } +// This function would probably benefit a lot from `level.children` beeing a `sync.Map`? +func (l *level) findLevelOrCreate(selector []string) *level { + l.lock.Lock() + if len(selector) == 0 { + return l + } + + child, ok := l.children[selector[0]] + if !ok { + child = &level{ + metrics: make(map[string]*buffer), + children: make(map[string]*level), + } + l.children[selector[0]] = child + } + + l.lock.Unlock() + return child.findLevelOrCreate(selector[1:]) +} + +// This function assmumes that `l.lock` is LOCKED! +// Read `buffer.read` for context. This function does +// a lot of short-lived allocations and copies if this is +// not the "native" level for the requested metric. There +// is a lot of optimization potential here! +// Optimization suggestion: Pass a buffer as argument onto which the values should be added. +func (l *level) read(metric string, from, to int64, accumulation string) ([]Float, int64, int64, error) { + if b, ok := l.metrics[metric]; ok { + // Whoo, this is the "native" level of this metric: + return b.read(from, to) + } + + if len(l.children) == 0 { + return nil, 0, 0, errors.New("no data for that metric/level") + } + + if len(l.children) == 1 { + for _, child := range l.children { + child.lock.Lock() + data, from, to, err := child.read(metric, from, to, accumulation) + child.lock.Unlock() + return data, from, to, err + } + } + + // "slow" case: We need to accumulate metrics accross levels/scopes/tags/whatever. + var data []Float = nil + for _, child := range l.children { + child.lock.Lock() + cdata, cfrom, cto, err := child.read(metric, from, to, accumulation) + child.lock.Unlock() + + if err != nil { + return nil, 0, 0, err + } + + if data == nil { + data = cdata + from = cfrom + to = cto + continue + } + + if cfrom != from || cto != to { + // TODO: Here, we could take the max of cfrom and from and the min of cto and to instead. + // This would mean that we also have to resize data. + return nil, 0, 0, errors.New("data for metrics at child levels does not align") + } + + if len(data) != len(cdata) { + panic("WTF? Different freq. at different levels?") + } + + for i := 0; i < len(data); i++ { + data[i] += cdata[i] + } + } + + switch accumulation { + case "sum": + return data, from, to, nil + case "avg": + normalize := 1. / Float(len(l.children)) + for i := 0; i < len(data); i++ { + data[i] *= normalize + } + return data, from, to, nil + default: + return nil, 0, 0, errors.New("invalid accumulation strategy: " + accumulation) + } +} + +type MemoryStore struct { + root level // root of the tree structure + metrics map[string]MetricConfig +} + +func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore { + return &MemoryStore{ + root: level{ + metrics: make(map[string]*buffer), + children: make(map[string]*level), + }, + metrics: metrics, + } +} + +// Write all values in `metrics` to the level specified by `selector` for time `ts`. +// Look at `findLevelOrCreate` for how selectors work. +func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error { + l := m.root.findLevelOrCreate(selector) + defer l.lock.Unlock() + + for _, metric := range metrics { + b, ok := l.metrics[metric.Name] + if !ok { + minfo, ok := m.metrics[metric.Name] + if !ok { + return errors.New("unkown metric: " + metric.Name) + } + + // First write to this metric and level + b = newBuffer(ts, minfo.Frequency) + l.metrics[metric.Name] = b + } + + nb, err := b.write(ts, metric.Value) + if err != nil { + return err + } + + // Last write created a new buffer... + if b != nb { + l.metrics[metric.Name] = nb + } + } + return nil +} + +func (m *MemoryStore) Read(selector []string, metric string, from, to int64) ([]Float, int64, int64, error) { + l := m.root.findLevelOrCreate(selector) + defer l.lock.Unlock() + + if from > to { + return nil, 0, 0, errors.New("invalid time range") + } + + minfo, ok := m.metrics[metric] + if !ok { + return nil, 0, 0, errors.New("unkown metric: " + metric) + } + + return l.read(metric, from, to, minfo.Aggregation) +}