Lazy-init children; Use slice for metric buffers

This commit is contained in:
Lou Knauer 2021-09-08 10:29:36 +02:00
parent e4c3bc4db1
commit a269ab423b
3 changed files with 130 additions and 63 deletions

View File

@ -45,7 +45,7 @@ func (m *MemoryStore) ToArchive(archiveRoot string, from, to int64) (int, error)
for i := 0; i < len(levels); i++ {
dir := path.Join(archiveRoot, path.Join(selectors[i]...))
err := levels[i].toArchive(dir, from, to)
err := levels[i].toArchive(dir, from, to, m)
if err != nil {
return i, err
}
@ -54,7 +54,7 @@ func (m *MemoryStore) ToArchive(archiveRoot string, from, to int64) (int, error)
return len(levels), nil
}
func (l *level) toArchiveFile(from, to int64) (*ArchiveFile, error) {
func (l *level) toArchiveFile(from, to int64, m *MemoryStore) (*ArchiveFile, error) {
l.lock.RLock()
defer l.lock.RUnlock()
@ -64,7 +64,12 @@ func (l *level) toArchiveFile(from, to int64) (*ArchiveFile, error) {
Children: make(map[string]*ArchiveFile),
}
for metric, b := range l.metrics {
for metric, minfo := range m.metrics {
b := l.metrics[minfo.offset]
if b == nil {
continue
}
data := make([]Float, (to-from)/b.frequency+1)
data, start, end, err := b.read(from, to, data)
if err != nil {
@ -83,7 +88,7 @@ func (l *level) toArchiveFile(from, to int64) (*ArchiveFile, error) {
}
for name, child := range l.children {
val, err := child.toArchiveFile(from, to)
val, err := child.toArchiveFile(from, to, m)
if err != nil {
return nil, err
}
@ -94,8 +99,8 @@ func (l *level) toArchiveFile(from, to int64) (*ArchiveFile, error) {
return retval, nil
}
func (l *level) toArchive(dir string, from, to int64) error {
af, err := l.toArchiveFile(from, to)
func (l *level) toArchive(dir string, from, to int64, m *MemoryStore) error {
af, err := l.toArchiveFile(from, to, m)
if err != nil {
return err
}
@ -126,10 +131,10 @@ func (l *level) toArchive(dir string, from, to int64) error {
// This function can only be called once and before the very first write or read.
// Unlike ToArchive, this function is NOT thread-safe.
func (m *MemoryStore) FromArchive(archiveRoot string, from int64) (int, error) {
return m.root.fromArchive(archiveRoot, from)
return m.root.fromArchive(archiveRoot, from, m)
}
func (l *level) loadFile(af *ArchiveFile) error {
func (l *level) loadFile(af *ArchiveFile, m *MemoryStore) error {
for name, metric := range af.Metrics {
n := len(metric.Data)
b := &buffer{
@ -140,9 +145,14 @@ func (l *level) loadFile(af *ArchiveFile) error {
next: nil,
}
prev, ok := l.metrics[name]
minfo, ok := m.metrics[name]
if !ok {
l.metrics[name] = b
return errors.New("Unkown metric: " + name)
}
prev := l.metrics[minfo.offset]
if prev == nil {
l.metrics[minfo.offset] = b
} else {
if prev.start > b.start {
return errors.New("wooops")
@ -151,20 +161,20 @@ func (l *level) loadFile(af *ArchiveFile) error {
b.prev = prev
prev.next = b
}
l.metrics[name] = b
l.metrics[minfo.offset] = b
}
for sel, childAf := range af.Children {
child, ok := l.children[sel]
if !ok {
child = &level{
metrics: make(map[string]*buffer),
metrics: make([]*buffer, len(m.metrics)),
children: make(map[string]*level),
}
l.children[sel] = child
}
err := child.loadFile(childAf)
err := child.loadFile(childAf, m)
if err != nil {
return err
}
@ -173,7 +183,7 @@ func (l *level) loadFile(af *ArchiveFile) error {
return nil
}
func (l *level) fromArchive(dir string, from int64) (int, error) {
func (l *level) fromArchive(dir string, from int64, m *MemoryStore) (int, error) {
direntries, err := os.ReadDir(dir)
if err != nil {
return 0, err
@ -184,11 +194,11 @@ func (l *level) fromArchive(dir string, from int64) (int, error) {
for _, e := range direntries {
if e.IsDir() {
child := &level{
metrics: make(map[string]*buffer),
metrics: make([]*buffer, len(m.metrics)),
children: make(map[string]*level),
}
files, err := child.fromArchive(path.Join(dir, e.Name()), from)
files, err := child.fromArchive(path.Join(dir, e.Name()), from, m)
filesLoaded += files
if err != nil {
return filesLoaded, err
@ -219,7 +229,7 @@ func (l *level) fromArchive(dir string, from int64) (int, error) {
return filesLoaded, err
}
err = l.loadFile(af)
err = l.loadFile(af, m)
if err != nil {
return filesLoaded, err
}

View File

@ -153,8 +153,8 @@ func (b *buffer) free(t int64) (int, error) {
// Can be both a leaf or a inner node. In this tree structue, inner nodes can
// also hold data (in `metrics`).
type level struct {
lock sync.RWMutex //
metrics map[string]*buffer // Every level can store metrics.
lock sync.RWMutex
metrics []*buffer // Every level can store metrics.
children map[string]*level // Lower levels.
}
@ -162,44 +162,56 @@ type level struct {
// it does not exist. Example selector in the context of the
// ClusterCockpit could be: []string{ "emmy", "host123", "cpu", "0" }
// This function would probably benefit a lot from `level.children` beeing a `sync.Map`?
func (l *level) findLevelOrCreate(selector []string) *level {
func (l *level) findLevelOrCreate(selector []string, nMetrics int) *level {
if len(selector) == 0 {
return l
}
// Allow concurrent reads:
l.lock.RLock()
var child *level
var ok bool
if l.children == nil {
// Children map needs to be created...
l.lock.RUnlock()
} else {
child, ok := l.children[selector[0]]
l.lock.RUnlock()
if ok {
return child.findLevelOrCreate(selector[1:])
return child.findLevelOrCreate(selector[1:], nMetrics)
}
}
// The level does not exist, take write lock for unqiue access:
l.lock.Lock()
// While this thread waited for the write lock, another thread
// could have created the child node.
if l.children != nil {
child, ok = l.children[selector[0]]
if ok {
l.lock.Unlock()
return child.findLevelOrCreate(selector[1:])
return child.findLevelOrCreate(selector[1:], nMetrics)
}
} else {
l.children = make(map[string]*level)
}
child = &level{
metrics: make(map[string]*buffer),
children: make(map[string]*level),
metrics: make([]*buffer, nMetrics),
children: nil,
}
l.children[selector[0]] = child
l.lock.Unlock()
return child.findLevelOrCreate(selector[1:])
return child.findLevelOrCreate(selector[1:], nMetrics)
}
// This function assmumes that `l.lock` is LOCKED!
// Read `buffer.read` for context.
// If this level does not have data for the requested metric, the data
// is aggregated timestep-wise from all the children (recursively).
func (l *level) read(metric string, from, to int64, data []Float) ([]Float, int, int64, int64, error) {
if b, ok := l.metrics[metric]; ok {
func (l *level) read(offset int, from, to int64, data []Float) ([]Float, int, int64, int64, error) {
if b := l.metrics[offset]; b != nil {
// Whoo, this is the "native" level of this metric:
data, from, to, err := b.read(from, to, data)
return data, 1, from, to, err
@ -212,7 +224,7 @@ func (l *level) read(metric string, from, to int64, data []Float) ([]Float, int,
n := 0
for _, child := range l.children {
child.lock.RLock()
cdata, cn, cfrom, cto, err := child.read(metric, from, to, data)
cdata, cn, cfrom, cto, err := child.read(offset, from, to, data)
child.lock.RUnlock()
if err == ErrNoData {
@ -255,6 +267,10 @@ func (l *level) free(t int64) (int, error) {
n := 0
for _, b := range l.metrics {
if b == nil {
continue
}
m, err := b.free(t)
n += m
if err != nil {
@ -273,40 +289,81 @@ func (l *level) free(t int64) (int, error) {
return n, nil
}
type AggregationStrategy int
const (
NoAggregation AggregationStrategy = iota
SumAggregation
AvgAggregation
)
type MemoryStore struct {
root level // root of the tree structure
metrics map[string]MetricConfig
metrics map[string]struct {
offset int
aggregation AggregationStrategy
frequency int64
}
}
func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore {
ms := make(map[string]struct {
offset int
aggregation AggregationStrategy
frequency int64
})
offset := 0
for key, config := range metrics {
aggregation := NoAggregation
if config.Aggregation == "sum" {
aggregation = SumAggregation
} else if config.Aggregation == "avg" {
aggregation = AvgAggregation
} else if config.Aggregation != "" {
panic("invalid aggregation strategy: " + config.Aggregation)
}
ms[key] = struct {
offset int
aggregation AggregationStrategy
frequency int64
}{
offset: offset,
aggregation: aggregation,
frequency: config.Frequency,
}
offset += 1
}
return &MemoryStore{
root: level{
metrics: make(map[string]*buffer),
metrics: make([]*buffer, len(metrics)),
children: make(map[string]*level),
},
metrics: metrics,
metrics: ms,
}
}
// Write all values in `metrics` to the level specified by `selector` for time `ts`.
// Look at `findLevelOrCreate` for how selectors work.
func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error {
l := m.root.findLevelOrCreate(selector)
l := m.root.findLevelOrCreate(selector, len(m.metrics))
l.lock.Lock()
defer l.lock.Unlock()
for _, metric := range metrics {
b, ok := l.metrics[metric.Name]
if !ok {
minfo, ok := m.metrics[metric.Name]
if !ok {
// return errors.New("unkown metric: " + metric.Name)
continue
}
b := l.metrics[minfo.offset]
if b == nil {
// First write to this metric and level
b = newBuffer(ts, minfo.Frequency)
l.metrics[metric.Name] = b
b = newBuffer(ts, minfo.frequency)
l.metrics[minfo.offset] = b
}
nb, err := b.write(ts, metric.Value)
@ -316,7 +373,7 @@ func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error
// Last write created a new buffer...
if b != nb {
l.metrics[metric.Name] = nb
l.metrics[minfo.offset] = nb
}
}
return nil
@ -326,7 +383,7 @@ func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error
// If the level does not hold the metric itself, the data will be aggregated recursively from the children.
// See `level.read` for more information.
func (m *MemoryStore) Read(selector []string, metric string, from, to int64) ([]Float, int64, int64, error) {
l := m.root.findLevelOrCreate(selector)
l := m.root.findLevelOrCreate(selector, len(m.metrics))
l.lock.RLock()
defer l.lock.RUnlock()
@ -339,20 +396,20 @@ func (m *MemoryStore) Read(selector []string, metric string, from, to int64) ([]
return nil, 0, 0, errors.New("unkown metric: " + metric)
}
data := make([]Float, (to-from)/minfo.Frequency)
data, n, from, to, err := l.read(metric, from, to, data)
data := make([]Float, (to-from)/minfo.frequency+1)
data, n, from, to, err := l.read(minfo.offset, from, to, data)
if err != nil {
return nil, 0, 0, err
}
if n > 1 {
if minfo.Aggregation == "avg" {
if minfo.aggregation == AvgAggregation {
normalize := 1. / Float(n)
for i := 0; i < len(data); i++ {
data[i] *= normalize
}
} else if minfo.Aggregation != "sum" {
return nil, 0, 0, errors.New("invalid aggregation strategy: " + minfo.Aggregation)
} else if minfo.aggregation != SumAggregation {
return nil, 0, 0, errors.New("invalid aggregation")
}
}
@ -362,5 +419,5 @@ func (m *MemoryStore) Read(selector []string, metric string, from, to int64) ([]
// Release all buffers for the selected level and all its children that contain only
// values older than `t`.
func (m *MemoryStore) Free(selector []string, t int64) (int, error) {
return m.root.findLevelOrCreate(selector).free(t)
return m.root.findLevelOrCreate(selector, len(m.metrics)).free(t)
}

View File

@ -61,8 +61,8 @@ func (b *buffer) stats(from, to int64) (Stats, int64, int64, error) {
// This function assmumes that `l.lock` is LOCKED!
// It basically works just like level.read but calculates min/max/avg for that data level.read would return.
// TODO: Make this DRY?
func (l *level) stats(metric string, from, to int64, aggregation string) (Stats, int64, int64, error) {
if b, ok := l.metrics[metric]; ok {
func (l *level) stats(offset int, from, to int64, aggreg AggregationStrategy) (Stats, int64, int64, error) {
if b := l.metrics[offset]; b != nil {
return b.stats(from, to)
}
@ -75,7 +75,7 @@ func (l *level) stats(metric string, from, to int64, aggregation string) (Stats,
avg, min, max := Float(0), Float(math.MaxFloat32), Float(-math.MaxFloat32)
for _, child := range l.children {
child.lock.RLock()
stats, cfrom, cto, err := child.stats(metric, from, to, aggregation)
stats, cfrom, cto, err := child.stats(offset, from, to, aggreg)
child.lock.RUnlock()
if err == ErrNoData {
@ -104,10 +104,10 @@ func (l *level) stats(metric string, from, to int64, aggregation string) (Stats,
return Stats{}, 0, 0, ErrNoData
}
if aggregation == "avg" {
if aggreg == AvgAggregation {
avg /= Float(n)
} else if aggregation != "sum" {
return Stats{}, 0, 0, errors.New("invalid aggregation strategy: " + aggregation)
} else if aggreg != SumAggregation {
return Stats{}, 0, 0, errors.New("invalid aggregation")
}
return Stats{
@ -119,7 +119,7 @@ func (l *level) stats(metric string, from, to int64, aggregation string) (Stats,
}
func (m *MemoryStore) Stats(selector []string, metric string, from, to int64) (*Stats, int64, int64, error) {
l := m.root.findLevelOrCreate(selector)
l := m.root.findLevelOrCreate(selector, len(m.metrics))
l.lock.RLock()
defer l.lock.RUnlock()
@ -132,6 +132,6 @@ func (m *MemoryStore) Stats(selector []string, metric string, from, to int64) (*
return nil, 0, 0, errors.New("unkown metric: " + metric)
}
stats, from, to, err := l.stats(metric, from, to, minfo.Aggregation)
stats, from, to, err := l.stats(minfo.offset, from, to, minfo.aggregation)
return &stats, from, to, err
}