mirror of
https://github.com/ClusterCockpit/cc-metric-store.git
synced 2024-11-10 05:07:25 +01:00
4a78a24034
While trying to optimize things I overread this: "the byte slices returned by the Decoder methods are only valid until the next call to any other Decode method."
504 lines
12 KiB
Go
504 lines
12 KiB
Go
package main
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
)
|
|
|
|
// Default buffer capacity.
|
|
// `buffer.data` will only ever grow up to it's capacity and a new link
|
|
// in the buffer chain will be created if needed so that no copying
|
|
// of data or reallocation needs to happen on writes.
|
|
const (
|
|
BUFFER_CAP int = 512
|
|
)
|
|
|
|
// So that we can reuse allocations
|
|
var bufferPool sync.Pool = sync.Pool{
|
|
New: func() interface{} {
|
|
return &buffer{
|
|
data: make([]Float, 0, BUFFER_CAP),
|
|
}
|
|
},
|
|
}
|
|
|
|
var (
|
|
ErrNoData error = errors.New("no data for this metric/level")
|
|
ErrDataDoesNotAlign error = errors.New("data from lower granularities does not align")
|
|
)
|
|
|
|
// Each metric on each level has it's own buffer.
|
|
// This is where the actual values go.
|
|
// If `cap(data)` is reached, a new buffer is created and
|
|
// becomes the new head of a buffer list.
|
|
type buffer struct {
|
|
frequency int64 // Time between two "slots"
|
|
start int64 // Timestamp of when `data[0]` was written.
|
|
data []Float // The slice should never reallocacte as `cap(data)` is respected.
|
|
prev, next *buffer // `prev` contains older data, `next` newer data.
|
|
archived bool // If true, this buffer is already archived
|
|
|
|
closed bool
|
|
/*
|
|
statisticts struct {
|
|
samples int
|
|
min Float
|
|
max Float
|
|
avg Float
|
|
}
|
|
*/
|
|
}
|
|
|
|
func newBuffer(ts, freq int64) *buffer {
|
|
b := bufferPool.Get().(*buffer)
|
|
b.frequency = freq
|
|
b.start = ts
|
|
b.prev = nil
|
|
b.next = nil
|
|
b.archived = false
|
|
b.closed = false
|
|
b.data = b.data[:0]
|
|
return b
|
|
}
|
|
|
|
// If a new buffer was created, the new head is returnd.
|
|
// Otherwise, the existing buffer is returnd.
|
|
// Normaly, only "newer" data should be written, but if the value would
|
|
// end up in the same buffer anyways it is allowed.
|
|
func (b *buffer) write(ts int64, value Float) (*buffer, error) {
|
|
if ts < b.start {
|
|
return nil, errors.New("cannot write value to buffer from past")
|
|
}
|
|
|
|
// When a new buffer is created, it starts at ts. If we would
|
|
// use the same index calculation as for a read here, even a very
|
|
// slight drift in the timestamps of values will cause cases where
|
|
// a cell is re-written. Adding any value smaller than half the frequency
|
|
// here creates a time buffer around the cutoff from one cell to the next
|
|
// with the same semantics as before.
|
|
idx := int((ts - b.start + (b.frequency / 3)) / b.frequency)
|
|
if idx >= cap(b.data) {
|
|
newbuf := newBuffer(ts, b.frequency)
|
|
newbuf.prev = b
|
|
b.next = newbuf
|
|
b.close()
|
|
b = newbuf
|
|
idx = 0
|
|
}
|
|
|
|
// Overwriting value or writing value from past
|
|
if idx < len(b.data) {
|
|
b.data[idx] = value
|
|
return b, nil
|
|
}
|
|
|
|
// Fill up unwritten slots with NaN
|
|
for i := len(b.data); i < idx; i++ {
|
|
b.data = append(b.data, NaN)
|
|
}
|
|
|
|
b.data = append(b.data, value)
|
|
return b, nil
|
|
}
|
|
|
|
func (b *buffer) end() int64 {
|
|
return b.start + int64(len(b.data))*b.frequency
|
|
}
|
|
|
|
func (b *buffer) close() {}
|
|
|
|
/*
|
|
func (b *buffer) close() {
|
|
if b.closed {
|
|
return
|
|
}
|
|
|
|
b.closed = true
|
|
n, sum, min, max := 0, 0., math.MaxFloat64, -math.MaxFloat64
|
|
for _, x := range b.data {
|
|
if x.IsNaN() {
|
|
continue
|
|
}
|
|
|
|
n += 1
|
|
f := float64(x)
|
|
sum += f
|
|
min = math.Min(min, f)
|
|
max = math.Max(max, f)
|
|
}
|
|
|
|
b.statisticts.samples = n
|
|
if n > 0 {
|
|
b.statisticts.avg = Float(sum / float64(n))
|
|
b.statisticts.min = Float(min)
|
|
b.statisticts.max = Float(max)
|
|
} else {
|
|
b.statisticts.avg = NaN
|
|
b.statisticts.min = NaN
|
|
b.statisticts.max = NaN
|
|
}
|
|
}
|
|
*/
|
|
|
|
// func interpolate(idx int, data []Float) Float {
|
|
// if idx == 0 || idx+1 == len(data) {
|
|
// return NaN
|
|
// }
|
|
// return (data[idx-1] + data[idx+1]) / 2.0
|
|
// }
|
|
|
|
// Return all known values from `from` to `to`. Gaps of information are represented as NaN.
|
|
// Simple linear interpolation is done between the two neighboring cells if possible.
|
|
// If values at the start or end are missing, instead of NaN values, the second and thrid
|
|
// return values contain the actual `from`/`to`.
|
|
// This function goes back the buffer chain if `from` is older than the currents buffer start.
|
|
// The loaded values are added to `data` and `data` is returned, possibly with a shorter length.
|
|
// If `data` is not long enough to hold all values, this function will panic!
|
|
func (b *buffer) read(from, to int64, data []Float) ([]Float, int64, int64, error) {
|
|
if from < b.start {
|
|
if b.prev != nil {
|
|
return b.prev.read(from, to, data)
|
|
}
|
|
from = b.start
|
|
}
|
|
|
|
var i int = 0
|
|
var t int64 = from
|
|
for ; t < to; t += b.frequency {
|
|
idx := int((t - b.start) / b.frequency)
|
|
if idx >= cap(b.data) {
|
|
if b.next == nil {
|
|
break
|
|
}
|
|
b = b.next
|
|
idx = 0
|
|
}
|
|
|
|
if idx >= len(b.data) {
|
|
if b.next == nil || to <= b.next.start {
|
|
break
|
|
}
|
|
data[i] += NaN
|
|
} else if t < b.start {
|
|
data[i] += NaN
|
|
// } else if b.data[idx].IsNaN() {
|
|
// data[i] += interpolate(idx, b.data)
|
|
} else {
|
|
data[i] += b.data[idx]
|
|
}
|
|
i++
|
|
}
|
|
|
|
return data[:i], from, t, nil
|
|
}
|
|
|
|
// Free up and free all buffers in the chain only containing data
|
|
// older than `t`.
|
|
func (b *buffer) free(t int64) (int, error) {
|
|
end := b.end()
|
|
if end < t && b.next != nil {
|
|
b.next.prev = nil
|
|
n := 0
|
|
for b != nil {
|
|
prev := b.prev
|
|
if prev != nil && prev.start > b.start {
|
|
panic("time travel?")
|
|
}
|
|
|
|
n += 1
|
|
// Buffers that come from the
|
|
// archive should not be reused.
|
|
if cap(b.data) == BUFFER_CAP {
|
|
bufferPool.Put(b)
|
|
}
|
|
b = prev
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
if b.prev != nil {
|
|
return b.prev.free(t)
|
|
}
|
|
|
|
return 0, nil
|
|
}
|
|
|
|
// Call `callback` on every buffer that contains data in the range from `from` to `to`.
|
|
func (b *buffer) iterFromTo(from, to int64, callback func(b *buffer) error) error {
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
|
|
if err := b.prev.iterFromTo(from, to, callback); err != nil {
|
|
return err
|
|
}
|
|
|
|
if from <= b.end() && b.start <= to {
|
|
return callback(b)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Could also be called "node" as this forms a node in a tree structure.
|
|
// Called level because "node" might be confusing here.
|
|
// Can be both a leaf or a inner node. In this tree structue, inner nodes can
|
|
// also hold data (in `metrics`).
|
|
type level struct {
|
|
lock sync.RWMutex
|
|
metrics []*buffer // Every level can store metrics.
|
|
children map[string]*level // Lower levels.
|
|
}
|
|
|
|
// Find the correct level for the given selector, creating it if
|
|
// it does not exist. Example selector in the context of the
|
|
// ClusterCockpit could be: []string{ "emmy", "host123", "cpu0" }.
|
|
// This function would probably benefit a lot from `level.children` beeing a `sync.Map`?
|
|
// If nMetrics is -1, do not create new levels.
|
|
func (l *level) findLevelOrCreate(selector []string, nMetrics int) *level {
|
|
if len(selector) == 0 {
|
|
return l
|
|
}
|
|
|
|
// Allow concurrent reads:
|
|
l.lock.RLock()
|
|
var child *level
|
|
var ok bool
|
|
if l.children == nil {
|
|
// Children map needs to be created...
|
|
l.lock.RUnlock()
|
|
if nMetrics == -1 {
|
|
return nil
|
|
}
|
|
} else {
|
|
child, ok := l.children[selector[0]]
|
|
l.lock.RUnlock()
|
|
if ok {
|
|
return child.findLevelOrCreate(selector[1:], nMetrics)
|
|
}
|
|
}
|
|
|
|
// The level does not exist, take write lock for unqiue access:
|
|
l.lock.Lock()
|
|
// While this thread waited for the write lock, another thread
|
|
// could have created the child node.
|
|
if l.children != nil {
|
|
child, ok = l.children[selector[0]]
|
|
if ok {
|
|
l.lock.Unlock()
|
|
return child.findLevelOrCreate(selector[1:], nMetrics)
|
|
}
|
|
}
|
|
|
|
child = &level{
|
|
metrics: make([]*buffer, nMetrics),
|
|
children: nil,
|
|
}
|
|
|
|
if l.children != nil {
|
|
l.children[selector[0]] = child
|
|
} else {
|
|
l.children = map[string]*level{selector[0]: child}
|
|
}
|
|
l.lock.Unlock()
|
|
return child.findLevelOrCreate(selector[1:], nMetrics)
|
|
}
|
|
|
|
// For aggregation over multiple values at different cpus/sockets/..., not time!
|
|
type AggregationStrategy int
|
|
|
|
const (
|
|
NoAggregation AggregationStrategy = iota
|
|
SumAggregation
|
|
AvgAggregation
|
|
)
|
|
|
|
type metricInfo struct {
|
|
offset int
|
|
aggregation AggregationStrategy
|
|
frequency int64
|
|
}
|
|
|
|
type MemoryStore struct {
|
|
root level // root of the tree structure
|
|
metrics map[string]metricInfo
|
|
}
|
|
|
|
// Return a new, initialized instance of a MemoryStore.
|
|
// Will panic if values in the metric configurations are invalid.
|
|
func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore {
|
|
ms := make(map[string]metricInfo)
|
|
|
|
offset := 0
|
|
for key, config := range metrics {
|
|
aggregation := NoAggregation
|
|
if config.Aggregation == "sum" {
|
|
aggregation = SumAggregation
|
|
} else if config.Aggregation == "avg" {
|
|
aggregation = AvgAggregation
|
|
} else if config.Aggregation != "" {
|
|
panic("invalid aggregation strategy: " + config.Aggregation)
|
|
}
|
|
|
|
if config.Frequency == 0 {
|
|
panic("invalid frequency")
|
|
}
|
|
|
|
ms[key] = metricInfo{
|
|
offset: offset,
|
|
aggregation: aggregation,
|
|
frequency: config.Frequency,
|
|
}
|
|
|
|
offset += 1
|
|
}
|
|
|
|
return &MemoryStore{
|
|
root: level{
|
|
metrics: make([]*buffer, len(metrics)),
|
|
children: make(map[string]*level),
|
|
},
|
|
metrics: ms,
|
|
}
|
|
}
|
|
|
|
// Write all values in `metrics` to the level specified by `selector` for time `ts`.
|
|
// Look at `findLevelOrCreate` for how selectors work.
|
|
func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error {
|
|
var ok bool
|
|
for i, metric := range metrics {
|
|
if metric.minfo.frequency == 0 {
|
|
metric.minfo, ok = m.metrics[metric.Name]
|
|
if !ok {
|
|
metric.minfo.frequency = 0
|
|
}
|
|
metrics[i] = metric
|
|
}
|
|
}
|
|
|
|
return m.WriteToLevel(&m.root, selector, ts, metrics)
|
|
}
|
|
|
|
func (m *MemoryStore) GetLevel(selector []string) *level {
|
|
return m.root.findLevelOrCreate(selector, len(m.metrics))
|
|
}
|
|
|
|
// Assumes that `minfo` in `metrics` is filled in!
|
|
func (m *MemoryStore) WriteToLevel(l *level, selector []string, ts int64, metrics []Metric) error {
|
|
l = l.findLevelOrCreate(selector, len(m.metrics))
|
|
l.lock.Lock()
|
|
defer l.lock.Unlock()
|
|
|
|
for _, metric := range metrics {
|
|
if metric.minfo.frequency == 0 {
|
|
continue
|
|
}
|
|
|
|
b := l.metrics[metric.minfo.offset]
|
|
if b == nil {
|
|
// First write to this metric and level
|
|
b = newBuffer(ts, metric.minfo.frequency)
|
|
l.metrics[metric.minfo.offset] = b
|
|
}
|
|
|
|
nb, err := b.write(ts, metric.Value)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Last write created a new buffer...
|
|
if b != nb {
|
|
l.metrics[metric.minfo.offset] = nb
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Returns all values for metric `metric` from `from` to `to` for the selected level(s).
|
|
// If the level does not hold the metric itself, the data will be aggregated recursively from the children.
|
|
// The second and third return value are the actual from/to for the data. Those can be different from
|
|
// the range asked for if no data was available.
|
|
func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([]Float, int64, int64, error) {
|
|
if from > to {
|
|
return nil, 0, 0, errors.New("invalid time range")
|
|
}
|
|
|
|
minfo, ok := m.metrics[metric]
|
|
if !ok {
|
|
return nil, 0, 0, errors.New("unkown metric: " + metric)
|
|
}
|
|
|
|
n, data := 0, make([]Float, (to-from)/minfo.frequency+1)
|
|
err := m.root.findBuffers(selector, minfo.offset, func(b *buffer) error {
|
|
cdata, cfrom, cto, err := b.read(from, to, data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if n == 0 {
|
|
from, to = cfrom, cto
|
|
} else if from != cfrom || to != cto || len(data) != len(cdata) {
|
|
return ErrDataDoesNotAlign
|
|
}
|
|
|
|
data = cdata
|
|
n += 1
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, 0, 0, err
|
|
} else if n == 0 {
|
|
return nil, 0, 0, errors.New("metric or host not found")
|
|
} else if n > 1 {
|
|
if minfo.aggregation == AvgAggregation {
|
|
normalize := 1. / Float(n)
|
|
for i := 0; i < len(data); i++ {
|
|
data[i] *= normalize
|
|
}
|
|
} else if minfo.aggregation != SumAggregation {
|
|
return nil, 0, 0, errors.New("invalid aggregation")
|
|
}
|
|
}
|
|
|
|
return data, from, to, nil
|
|
}
|
|
|
|
// Release all buffers for the selected level and all its children that contain only
|
|
// values older than `t`.
|
|
func (m *MemoryStore) Free(selector Selector, t int64) (int, error) {
|
|
n := 0
|
|
err := m.root.findBuffers(selector, -1, func(b *buffer) error {
|
|
m, err := b.free(t)
|
|
n += m
|
|
return err
|
|
})
|
|
return n, err
|
|
}
|
|
|
|
func (m *MemoryStore) FreeAll() error {
|
|
for k := range m.root.children {
|
|
delete(m.root.children, k)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Given a selector, return a list of all children of the level selected.
|
|
func (m *MemoryStore) ListChildren(selector []string) []string {
|
|
lvl := m.root.findLevelOrCreate(selector, -1)
|
|
if lvl == nil {
|
|
return nil
|
|
}
|
|
|
|
lvl.lock.RLock()
|
|
defer lvl.lock.RUnlock()
|
|
|
|
children := make([]string, 0, len(lvl.children))
|
|
for child := range lvl.children {
|
|
children = append(children, child)
|
|
}
|
|
|
|
return children
|
|
}
|