Move selector type and read logic

This commit is contained in:
Lou Knauer
2022-07-20 16:09:13 +02:00
parent e846a4625e
commit 1125678bc0
4 changed files with 198 additions and 125 deletions

View File

@@ -72,8 +72,6 @@ func (c *chunk) firstWrite() int64 {
return c.start + (c.frequency / 2)
}
func (c *chunk) close() {}
// func interpolate(idx int, data []Float) Float {
// if idx == 0 || idx+1 == len(data) {
// return NaN

View File

@@ -1,6 +1,7 @@
package memstore
import (
"errors"
"sync"
"github.com/ClusterCockpit/cc-metric-store/internal/types"
@@ -66,6 +67,22 @@ func (l *Level) findLevelOrCreate(selector []string, nMetrics int) *Level {
return child.findLevelOrCreate(selector[1:], nMetrics)
}
func (l *Level) findLevel(selector []string) *Level {
if len(selector) == 0 {
return l
}
l.lock.RLock()
defer l.lock.RUnlock()
lvl := l.sublevels[selector[0]]
if lvl == nil {
return nil
}
return lvl.findLevel(selector[1:])
}
func (l *Level) free(t int64) (delme bool, n int) {
l.lock.Lock()
defer l.lock.Unlock()
@@ -171,3 +188,133 @@ func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metric
}
return nil
}
func (m *MemoryStore) Free(t int64) int {
_, n := m.root.free(t)
return n
}
func (l *Level) findBuffers(selector types.Selector, offset int, f func(c *chunk) error) error {
l.lock.RLock()
defer l.lock.RUnlock()
if len(selector) == 0 {
b := l.metrics[offset]
if b != nil {
return f(b)
}
for _, lvl := range l.sublevels {
err := lvl.findBuffers(nil, offset, f)
if err != nil {
return err
}
}
return nil
}
sel := selector[0]
if len(sel.String) != 0 && l.sublevels != nil {
lvl, ok := l.sublevels[sel.String]
if ok {
err := lvl.findBuffers(selector[1:], offset, f)
if err != nil {
return err
}
}
return nil
}
if sel.Group != nil && l.sublevels != nil {
for _, key := range sel.Group {
lvl, ok := l.sublevels[key]
if ok {
err := lvl.findBuffers(selector[1:], offset, f)
if err != nil {
return err
}
}
}
return nil
}
if sel.Any && l.sublevels != nil {
for _, lvl := range l.sublevels {
if err := lvl.findBuffers(selector[1:], offset, f); err != nil {
return err
}
}
return nil
}
return nil
}
var (
ErrNoData error = errors.New("no data for this metric/level")
ErrDataDoesNotAlign error = errors.New("data from lower granularities does not align")
)
// Returns all values for metric `metric` from `from` to `to` for the selected level(s).
// If the level does not hold the metric itself, the data will be aggregated recursively from the children.
// The second and third return value are the actual from/to for the data. Those can be different from
// the range asked for if no data was available.
func (m *MemoryStore) Read(selector types.Selector, metric string, from, to int64) ([]types.Float, int64, int64, error) {
if from > to {
return nil, 0, 0, errors.New("invalid time range")
}
mc, ok := m.metrics[metric]
if !ok {
return nil, 0, 0, errors.New("unkown metric: " + metric)
}
n, data := 0, make([]types.Float, (to-from)/mc.Frequency+1)
err := m.root.findBuffers(selector, mc.Offset, func(c *chunk) error {
cdata, cfrom, cto, err := c.read(from, to, data)
if err != nil {
return err
}
if n == 0 {
from, to = cfrom, cto
} else if from != cfrom || to != cto || len(data) != len(cdata) {
missingfront, missingback := int((from-cfrom)/mc.Frequency), int((to-cto)/mc.Frequency)
if missingfront != 0 {
return ErrDataDoesNotAlign
}
newlen := len(cdata) - missingback
if newlen < 1 {
return ErrDataDoesNotAlign
}
cdata = cdata[0:newlen]
if len(cdata) != len(data) {
return ErrDataDoesNotAlign
}
from, to = cfrom, cto
}
data = cdata
n += 1
return nil
})
if err != nil {
return nil, 0, 0, err
} else if n == 0 {
return nil, 0, 0, errors.New("metric or host not found")
} else if n > 1 {
if mc.Aggregation == types.AvgAggregation {
normalize := 1. / types.Float(n)
for i := 0; i < len(data); i++ {
data[i] *= normalize
}
} else if mc.Aggregation != types.SumAggregation {
return nil, 0, 0, errors.New("invalid aggregation")
}
}
return data, from, to, nil
}