diff --git a/internal/memstore/chunks.go b/internal/memstore/chunks.go index a0ce97a..b84adaf 100644 --- a/internal/memstore/chunks.go +++ b/internal/memstore/chunks.go @@ -72,8 +72,6 @@ func (c *chunk) firstWrite() int64 { return c.start + (c.frequency / 2) } -func (c *chunk) close() {} - // func interpolate(idx int, data []Float) Float { // if idx == 0 || idx+1 == len(data) { // return NaN diff --git a/internal/memstore/memstore.go b/internal/memstore/memstore.go index 595d7fe..a87147c 100644 --- a/internal/memstore/memstore.go +++ b/internal/memstore/memstore.go @@ -1,6 +1,7 @@ package memstore import ( + "errors" "sync" "github.com/ClusterCockpit/cc-metric-store/internal/types" @@ -66,6 +67,22 @@ func (l *Level) findLevelOrCreate(selector []string, nMetrics int) *Level { return child.findLevelOrCreate(selector[1:], nMetrics) } +func (l *Level) findLevel(selector []string) *Level { + if len(selector) == 0 { + return l + } + + l.lock.RLock() + defer l.lock.RUnlock() + + lvl := l.sublevels[selector[0]] + if lvl == nil { + return nil + } + + return lvl.findLevel(selector[1:]) +} + func (l *Level) free(t int64) (delme bool, n int) { l.lock.Lock() defer l.lock.Unlock() @@ -171,3 +188,133 @@ func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metric } return nil } + +func (m *MemoryStore) Free(t int64) int { + _, n := m.root.free(t) + return n +} + +func (l *Level) findBuffers(selector types.Selector, offset int, f func(c *chunk) error) error { + l.lock.RLock() + defer l.lock.RUnlock() + + if len(selector) == 0 { + b := l.metrics[offset] + if b != nil { + return f(b) + } + + for _, lvl := range l.sublevels { + err := lvl.findBuffers(nil, offset, f) + if err != nil { + return err + } + } + return nil + } + + sel := selector[0] + if len(sel.String) != 0 && l.sublevels != nil { + lvl, ok := l.sublevels[sel.String] + if ok { + err := lvl.findBuffers(selector[1:], offset, f) + if err != nil { + return err + } + } + return nil + } + + if sel.Group != nil && l.sublevels != nil { + for _, key := range sel.Group { + lvl, ok := l.sublevels[key] + if ok { + err := lvl.findBuffers(selector[1:], offset, f) + if err != nil { + return err + } + } + } + return nil + } + + if sel.Any && l.sublevels != nil { + for _, lvl := range l.sublevels { + if err := lvl.findBuffers(selector[1:], offset, f); err != nil { + return err + } + } + return nil + } + + return nil +} + +var ( + ErrNoData error = errors.New("no data for this metric/level") + ErrDataDoesNotAlign error = errors.New("data from lower granularities does not align") +) + +// Returns all values for metric `metric` from `from` to `to` for the selected level(s). +// If the level does not hold the metric itself, the data will be aggregated recursively from the children. +// The second and third return value are the actual from/to for the data. Those can be different from +// the range asked for if no data was available. +func (m *MemoryStore) Read(selector types.Selector, metric string, from, to int64) ([]types.Float, int64, int64, error) { + if from > to { + return nil, 0, 0, errors.New("invalid time range") + } + + mc, ok := m.metrics[metric] + if !ok { + return nil, 0, 0, errors.New("unkown metric: " + metric) + } + + n, data := 0, make([]types.Float, (to-from)/mc.Frequency+1) + err := m.root.findBuffers(selector, mc.Offset, func(c *chunk) error { + cdata, cfrom, cto, err := c.read(from, to, data) + if err != nil { + return err + } + + if n == 0 { + from, to = cfrom, cto + } else if from != cfrom || to != cto || len(data) != len(cdata) { + missingfront, missingback := int((from-cfrom)/mc.Frequency), int((to-cto)/mc.Frequency) + if missingfront != 0 { + return ErrDataDoesNotAlign + } + + newlen := len(cdata) - missingback + if newlen < 1 { + return ErrDataDoesNotAlign + } + cdata = cdata[0:newlen] + if len(cdata) != len(data) { + return ErrDataDoesNotAlign + } + + from, to = cfrom, cto + } + + data = cdata + n += 1 + return nil + }) + + if err != nil { + return nil, 0, 0, err + } else if n == 0 { + return nil, 0, 0, errors.New("metric or host not found") + } else if n > 1 { + if mc.Aggregation == types.AvgAggregation { + normalize := 1. / types.Float(n) + for i := 0; i < len(data); i++ { + data[i] *= normalize + } + } else if mc.Aggregation != types.SumAggregation { + return nil, 0, 0, errors.New("invalid aggregation") + } + } + + return data, from, to, nil +} diff --git a/internal/types/selector.go b/internal/types/selector.go new file mode 100644 index 0000000..58b3631 --- /dev/null +++ b/internal/types/selector.go @@ -0,0 +1,51 @@ +package types + +import ( + "encoding/json" + "errors" +) + +type SelectorElement struct { + Any bool + String string + Group []string +} + +func (se *SelectorElement) UnmarshalJSON(input []byte) error { + if input[0] == '"' { + if err := json.Unmarshal(input, &se.String); err != nil { + return err + } + + if se.String == "*" { + se.Any = true + se.String = "" + } + + return nil + } + + if input[0] == '[' { + return json.Unmarshal(input, &se.Group) + } + + return errors.New("the Go SelectorElement type can only be a string or an array of strings") +} + +func (se *SelectorElement) MarshalJSON() ([]byte, error) { + if se.Any { + return []byte("\"*\""), nil + } + + if se.String != "" { + return json.Marshal(se.String) + } + + if se.Group != nil { + return json.Marshal(se.Group) + } + + return nil, errors.New("a Go Selector must be a non-empty string or a non-empty slice of strings") +} + +type Selector []SelectorElement diff --git a/selector.go b/selector.go deleted file mode 100644 index 25fe209..0000000 --- a/selector.go +++ /dev/null @@ -1,123 +0,0 @@ -package main - -import ( - "encoding/json" - "errors" -) - -type SelectorElement struct { - Any bool - String string - Group []string -} - -func (se *SelectorElement) UnmarshalJSON(input []byte) error { - if input[0] == '"' { - if err := json.Unmarshal(input, &se.String); err != nil { - return err - } - - if se.String == "*" { - se.Any = true - se.String = "" - } - - return nil - } - - if input[0] == '[' { - return json.Unmarshal(input, &se.Group) - } - - return errors.New("the Go SelectorElement type can only be a string or an array of strings") -} - -func (se *SelectorElement) MarshalJSON() ([]byte, error) { - if se.Any { - return []byte("\"*\""), nil - } - - if se.String != "" { - return json.Marshal(se.String) - } - - if se.Group != nil { - return json.Marshal(se.Group) - } - - return nil, errors.New("a Go Selector must be a non-empty string or a non-empty slice of strings") -} - -type Selector []SelectorElement - -func (l *level) findLevel(selector []string) *level { - if len(selector) == 0 { - return l - } - - l.lock.RLock() - defer l.lock.RUnlock() - - lvl := l.children[selector[0]] - if lvl == nil { - return nil - } - - return lvl.findLevel(selector[1:]) -} - -func (l *level) findBuffers(selector Selector, offset int, f func(b *buffer) error) error { - l.lock.RLock() - defer l.lock.RUnlock() - - if len(selector) == 0 { - b := l.metrics[offset] - if b != nil { - return f(b) - } - - for _, lvl := range l.children { - err := lvl.findBuffers(nil, offset, f) - if err != nil { - return err - } - } - return nil - } - - sel := selector[0] - if len(sel.String) != 0 && l.children != nil { - lvl, ok := l.children[sel.String] - if ok { - err := lvl.findBuffers(selector[1:], offset, f) - if err != nil { - return err - } - } - return nil - } - - if sel.Group != nil && l.children != nil { - for _, key := range sel.Group { - lvl, ok := l.children[key] - if ok { - err := lvl.findBuffers(selector[1:], offset, f) - if err != nil { - return err - } - } - } - return nil - } - - if sel.Any && l.children != nil { - for _, lvl := range l.children { - if err := lvl.findBuffers(selector[1:], offset, f); err != nil { - return err - } - } - return nil - } - - return nil -}