diff --git a/archive.go b/archive.go index ee8860c..18f8c32 100644 --- a/archive.go +++ b/archive.go @@ -45,7 +45,7 @@ func (m *MemoryStore) ToArchive(archiveRoot string, from, to int64) (int, error) for i := 0; i < len(levels); i++ { dir := path.Join(archiveRoot, path.Join(selectors[i]...)) - err := levels[i].toArchive(dir, from, to) + err := levels[i].toArchive(dir, from, to, m) if err != nil { return i, err } @@ -54,7 +54,7 @@ func (m *MemoryStore) ToArchive(archiveRoot string, from, to int64) (int, error) return len(levels), nil } -func (l *level) toArchiveFile(from, to int64) (*ArchiveFile, error) { +func (l *level) toArchiveFile(from, to int64, m *MemoryStore) (*ArchiveFile, error) { l.lock.RLock() defer l.lock.RUnlock() @@ -64,7 +64,12 @@ func (l *level) toArchiveFile(from, to int64) (*ArchiveFile, error) { Children: make(map[string]*ArchiveFile), } - for metric, b := range l.metrics { + for metric, minfo := range m.metrics { + b := l.metrics[minfo.offset] + if b == nil { + continue + } + data := make([]Float, (to-from)/b.frequency+1) data, start, end, err := b.read(from, to, data) if err != nil { @@ -83,7 +88,7 @@ func (l *level) toArchiveFile(from, to int64) (*ArchiveFile, error) { } for name, child := range l.children { - val, err := child.toArchiveFile(from, to) + val, err := child.toArchiveFile(from, to, m) if err != nil { return nil, err } @@ -94,8 +99,8 @@ func (l *level) toArchiveFile(from, to int64) (*ArchiveFile, error) { return retval, nil } -func (l *level) toArchive(dir string, from, to int64) error { - af, err := l.toArchiveFile(from, to) +func (l *level) toArchive(dir string, from, to int64, m *MemoryStore) error { + af, err := l.toArchiveFile(from, to, m) if err != nil { return err } @@ -126,10 +131,10 @@ func (l *level) toArchive(dir string, from, to int64) error { // This function can only be called once and before the very first write or read. // Unlike ToArchive, this function is NOT thread-safe. func (m *MemoryStore) FromArchive(archiveRoot string, from int64) (int, error) { - return m.root.fromArchive(archiveRoot, from) + return m.root.fromArchive(archiveRoot, from, m) } -func (l *level) loadFile(af *ArchiveFile) error { +func (l *level) loadFile(af *ArchiveFile, m *MemoryStore) error { for name, metric := range af.Metrics { n := len(metric.Data) b := &buffer{ @@ -140,9 +145,14 @@ func (l *level) loadFile(af *ArchiveFile) error { next: nil, } - prev, ok := l.metrics[name] + minfo, ok := m.metrics[name] if !ok { - l.metrics[name] = b + return errors.New("Unkown metric: " + name) + } + + prev := l.metrics[minfo.offset] + if prev == nil { + l.metrics[minfo.offset] = b } else { if prev.start > b.start { return errors.New("wooops") @@ -151,20 +161,20 @@ func (l *level) loadFile(af *ArchiveFile) error { b.prev = prev prev.next = b } - l.metrics[name] = b + l.metrics[minfo.offset] = b } for sel, childAf := range af.Children { child, ok := l.children[sel] if !ok { child = &level{ - metrics: make(map[string]*buffer), + metrics: make([]*buffer, len(m.metrics)), children: make(map[string]*level), } l.children[sel] = child } - err := child.loadFile(childAf) + err := child.loadFile(childAf, m) if err != nil { return err } @@ -173,7 +183,7 @@ func (l *level) loadFile(af *ArchiveFile) error { return nil } -func (l *level) fromArchive(dir string, from int64) (int, error) { +func (l *level) fromArchive(dir string, from int64, m *MemoryStore) (int, error) { direntries, err := os.ReadDir(dir) if err != nil { return 0, err @@ -184,11 +194,11 @@ func (l *level) fromArchive(dir string, from int64) (int, error) { for _, e := range direntries { if e.IsDir() { child := &level{ - metrics: make(map[string]*buffer), + metrics: make([]*buffer, len(m.metrics)), children: make(map[string]*level), } - files, err := child.fromArchive(path.Join(dir, e.Name()), from) + files, err := child.fromArchive(path.Join(dir, e.Name()), from, m) filesLoaded += files if err != nil { return filesLoaded, err @@ -219,7 +229,7 @@ func (l *level) fromArchive(dir string, from int64) (int, error) { return filesLoaded, err } - err = l.loadFile(af) + err = l.loadFile(af, m) if err != nil { return filesLoaded, err } diff --git a/memstore.go b/memstore.go index 80f2943..81966bf 100644 --- a/memstore.go +++ b/memstore.go @@ -153,53 +153,65 @@ func (b *buffer) free(t int64) (int, error) { // Can be both a leaf or a inner node. In this tree structue, inner nodes can // also hold data (in `metrics`). type level struct { - lock sync.RWMutex // - metrics map[string]*buffer // Every level can store metrics. - children map[string]*level // Lower levels. + lock sync.RWMutex + metrics []*buffer // Every level can store metrics. + children map[string]*level // Lower levels. } // Find the correct level for the given selector, creating it if // it does not exist. Example selector in the context of the // ClusterCockpit could be: []string{ "emmy", "host123", "cpu", "0" } // This function would probably benefit a lot from `level.children` beeing a `sync.Map`? -func (l *level) findLevelOrCreate(selector []string) *level { +func (l *level) findLevelOrCreate(selector []string, nMetrics int) *level { if len(selector) == 0 { return l } // Allow concurrent reads: l.lock.RLock() - child, ok := l.children[selector[0]] - l.lock.RUnlock() - if ok { - return child.findLevelOrCreate(selector[1:]) + var child *level + var ok bool + if l.children == nil { + // Children map needs to be created... + l.lock.RUnlock() + } else { + child, ok := l.children[selector[0]] + l.lock.RUnlock() + if ok { + return child.findLevelOrCreate(selector[1:], nMetrics) + } } // The level does not exist, take write lock for unqiue access: l.lock.Lock() // While this thread waited for the write lock, another thread // could have created the child node. - child, ok = l.children[selector[0]] - if ok { - l.lock.Unlock() - return child.findLevelOrCreate(selector[1:]) + if l.children != nil { + child, ok = l.children[selector[0]] + if ok { + l.lock.Unlock() + return child.findLevelOrCreate(selector[1:], nMetrics) + } + } else { + l.children = make(map[string]*level) } child = &level{ - metrics: make(map[string]*buffer), - children: make(map[string]*level), + metrics: make([]*buffer, nMetrics), + children: nil, } + l.children[selector[0]] = child l.lock.Unlock() - return child.findLevelOrCreate(selector[1:]) + return child.findLevelOrCreate(selector[1:], nMetrics) } // This function assmumes that `l.lock` is LOCKED! // Read `buffer.read` for context. // If this level does not have data for the requested metric, the data // is aggregated timestep-wise from all the children (recursively). -func (l *level) read(metric string, from, to int64, data []Float) ([]Float, int, int64, int64, error) { - if b, ok := l.metrics[metric]; ok { +func (l *level) read(offset int, from, to int64, data []Float) ([]Float, int, int64, int64, error) { + if b := l.metrics[offset]; b != nil { // Whoo, this is the "native" level of this metric: data, from, to, err := b.read(from, to, data) return data, 1, from, to, err @@ -212,7 +224,7 @@ func (l *level) read(metric string, from, to int64, data []Float) ([]Float, int, n := 0 for _, child := range l.children { child.lock.RLock() - cdata, cn, cfrom, cto, err := child.read(metric, from, to, data) + cdata, cn, cfrom, cto, err := child.read(offset, from, to, data) child.lock.RUnlock() if err == ErrNoData { @@ -255,6 +267,10 @@ func (l *level) free(t int64) (int, error) { n := 0 for _, b := range l.metrics { + if b == nil { + continue + } + m, err := b.free(t) n += m if err != nil { @@ -273,40 +289,81 @@ func (l *level) free(t int64) (int, error) { return n, nil } +type AggregationStrategy int + +const ( + NoAggregation AggregationStrategy = iota + SumAggregation + AvgAggregation +) + type MemoryStore struct { root level // root of the tree structure - metrics map[string]MetricConfig + metrics map[string]struct { + offset int + aggregation AggregationStrategy + frequency int64 + } } func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore { + ms := make(map[string]struct { + offset int + aggregation AggregationStrategy + frequency int64 + }) + + offset := 0 + for key, config := range metrics { + aggregation := NoAggregation + if config.Aggregation == "sum" { + aggregation = SumAggregation + } else if config.Aggregation == "avg" { + aggregation = AvgAggregation + } else if config.Aggregation != "" { + panic("invalid aggregation strategy: " + config.Aggregation) + } + + ms[key] = struct { + offset int + aggregation AggregationStrategy + frequency int64 + }{ + offset: offset, + aggregation: aggregation, + frequency: config.Frequency, + } + + offset += 1 + } + return &MemoryStore{ root: level{ - metrics: make(map[string]*buffer), + metrics: make([]*buffer, len(metrics)), children: make(map[string]*level), }, - metrics: metrics, + metrics: ms, } } // Write all values in `metrics` to the level specified by `selector` for time `ts`. // Look at `findLevelOrCreate` for how selectors work. func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error { - l := m.root.findLevelOrCreate(selector) + l := m.root.findLevelOrCreate(selector, len(m.metrics)) l.lock.Lock() defer l.lock.Unlock() for _, metric := range metrics { - b, ok := l.metrics[metric.Name] + minfo, ok := m.metrics[metric.Name] if !ok { - minfo, ok := m.metrics[metric.Name] - if !ok { - // return errors.New("unkown metric: " + metric.Name) - continue - } + continue + } + b := l.metrics[minfo.offset] + if b == nil { // First write to this metric and level - b = newBuffer(ts, minfo.Frequency) - l.metrics[metric.Name] = b + b = newBuffer(ts, minfo.frequency) + l.metrics[minfo.offset] = b } nb, err := b.write(ts, metric.Value) @@ -316,7 +373,7 @@ func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error // Last write created a new buffer... if b != nb { - l.metrics[metric.Name] = nb + l.metrics[minfo.offset] = nb } } return nil @@ -326,7 +383,7 @@ func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error // If the level does not hold the metric itself, the data will be aggregated recursively from the children. // See `level.read` for more information. func (m *MemoryStore) Read(selector []string, metric string, from, to int64) ([]Float, int64, int64, error) { - l := m.root.findLevelOrCreate(selector) + l := m.root.findLevelOrCreate(selector, len(m.metrics)) l.lock.RLock() defer l.lock.RUnlock() @@ -339,20 +396,20 @@ func (m *MemoryStore) Read(selector []string, metric string, from, to int64) ([] return nil, 0, 0, errors.New("unkown metric: " + metric) } - data := make([]Float, (to-from)/minfo.Frequency) - data, n, from, to, err := l.read(metric, from, to, data) + data := make([]Float, (to-from)/minfo.frequency+1) + data, n, from, to, err := l.read(minfo.offset, from, to, data) if err != nil { return nil, 0, 0, err } if n > 1 { - if minfo.Aggregation == "avg" { + if minfo.aggregation == AvgAggregation { normalize := 1. / Float(n) for i := 0; i < len(data); i++ { data[i] *= normalize } - } else if minfo.Aggregation != "sum" { - return nil, 0, 0, errors.New("invalid aggregation strategy: " + minfo.Aggregation) + } else if minfo.aggregation != SumAggregation { + return nil, 0, 0, errors.New("invalid aggregation") } } @@ -362,5 +419,5 @@ func (m *MemoryStore) Read(selector []string, metric string, from, to int64) ([] // Release all buffers for the selected level and all its children that contain only // values older than `t`. func (m *MemoryStore) Free(selector []string, t int64) (int, error) { - return m.root.findLevelOrCreate(selector).free(t) + return m.root.findLevelOrCreate(selector, len(m.metrics)).free(t) } diff --git a/stats.go b/stats.go index e47f74d..53e4e9a 100644 --- a/stats.go +++ b/stats.go @@ -61,8 +61,8 @@ func (b *buffer) stats(from, to int64) (Stats, int64, int64, error) { // This function assmumes that `l.lock` is LOCKED! // It basically works just like level.read but calculates min/max/avg for that data level.read would return. // TODO: Make this DRY? -func (l *level) stats(metric string, from, to int64, aggregation string) (Stats, int64, int64, error) { - if b, ok := l.metrics[metric]; ok { +func (l *level) stats(offset int, from, to int64, aggreg AggregationStrategy) (Stats, int64, int64, error) { + if b := l.metrics[offset]; b != nil { return b.stats(from, to) } @@ -75,7 +75,7 @@ func (l *level) stats(metric string, from, to int64, aggregation string) (Stats, avg, min, max := Float(0), Float(math.MaxFloat32), Float(-math.MaxFloat32) for _, child := range l.children { child.lock.RLock() - stats, cfrom, cto, err := child.stats(metric, from, to, aggregation) + stats, cfrom, cto, err := child.stats(offset, from, to, aggreg) child.lock.RUnlock() if err == ErrNoData { @@ -104,10 +104,10 @@ func (l *level) stats(metric string, from, to int64, aggregation string) (Stats, return Stats{}, 0, 0, ErrNoData } - if aggregation == "avg" { + if aggreg == AvgAggregation { avg /= Float(n) - } else if aggregation != "sum" { - return Stats{}, 0, 0, errors.New("invalid aggregation strategy: " + aggregation) + } else if aggreg != SumAggregation { + return Stats{}, 0, 0, errors.New("invalid aggregation") } return Stats{ @@ -119,7 +119,7 @@ func (l *level) stats(metric string, from, to int64, aggregation string) (Stats, } func (m *MemoryStore) Stats(selector []string, metric string, from, to int64) (*Stats, int64, int64, error) { - l := m.root.findLevelOrCreate(selector) + l := m.root.findLevelOrCreate(selector, len(m.metrics)) l.lock.RLock() defer l.lock.RUnlock() @@ -132,6 +132,6 @@ func (m *MemoryStore) Stats(selector []string, metric string, from, to int64) (* return nil, 0, 0, errors.New("unkown metric: " + metric) } - stats, from, to, err := l.stats(metric, from, to, minfo.Aggregation) + stats, from, to, err := l.stats(minfo.offset, from, to, minfo.aggregation) return &stats, from, to, err }