From b2528f958cf8185f9031535a9596f8fa49ee27f8 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 6 May 2024 09:27:28 +0200 Subject: [PATCH] Continue restructuring. Intermediate state. --- cmd/cc-metric-store/main.go | 23 +- internal/api/api.go | 37 +- internal/api/lineprotocol.go | 39 +- internal/config/config.go | 2 +- internal/{memstore => memorystore}/archive.go | 44 +- internal/memorystore/buffer.go | 241 ++++++++ debug.go => internal/memorystore/debug.go | 6 +- internal/memorystore/level.go | 183 ++++++ internal/memorystore/memorystore.go | 222 +++++++ internal/memorystore/selector.go | 51 ++ internal/{util => memorystore}/stats.go | 33 +- internal/memstore/memstore.go | 542 ------------------ internal/memstore/selector.go | 123 ---- 13 files changed, 795 insertions(+), 751 deletions(-) rename internal/{memstore => memorystore}/archive.go (92%) create mode 100644 internal/memorystore/buffer.go rename debug.go => internal/memorystore/debug.go (94%) create mode 100644 internal/memorystore/level.go create mode 100644 internal/memorystore/memorystore.go create mode 100644 internal/memorystore/selector.go rename internal/{util => memorystore}/stats.go (75%) delete mode 100644 internal/memstore/memstore.go delete mode 100644 internal/memstore/selector.go diff --git a/cmd/cc-metric-store/main.go b/cmd/cc-metric-store/main.go index 5d0f1a3..601dfe2 100644 --- a/cmd/cc-metric-store/main.go +++ b/cmd/cc-metric-store/main.go @@ -16,13 +16,13 @@ import ( "github.com/ClusterCockpit/cc-metric-store/internal/api" "github.com/ClusterCockpit/cc-metric-store/internal/config" - "github.com/ClusterCockpit/cc-metric-store/internal/memstore" + "github.com/ClusterCockpit/cc-metric-store/internal/memorystore" "github.com/google/gops/agent" ) var ( conf config.Config - memoryStore *memstore.MemoryStore = nil + ms *memorystore.MemoryStore = nil lastCheckpoint time.Time ) @@ -64,7 +64,7 @@ func intervals(wg *sync.WaitGroup, ctx context.Context) { case <-ticks: t := time.Now().Add(-d) log.Printf("start freeing buffers (older than %s)...\n", t.Format(time.RFC3339)) - freed, err := memoryStore.Free(nil, t.Unix()) + freed, err := ms.Free(nil, t.Unix()) if err != nil { log.Printf("freeing up buffers failed: %s\n", err.Error()) } else { @@ -93,7 +93,7 @@ func intervals(wg *sync.WaitGroup, ctx context.Context) { case <-ticks: log.Printf("start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339)) now := time.Now() - n, err := memoryStore.ToCheckpoint(conf.Checkpoints.RootDir, + n, err := ms.ToCheckpoint(conf.Checkpoints.RootDir, lastCheckpoint.Unix(), now.Unix()) if err != nil { log.Printf("checkpointing failed: %s\n", err.Error()) @@ -123,7 +123,7 @@ func intervals(wg *sync.WaitGroup, ctx context.Context) { case <-ticks: t := time.Now().Add(-d) log.Printf("start archiving checkpoints (older than %s)...\n", t.Format(time.RFC3339)) - n, err := memstore.ArchiveCheckpoints(conf.Checkpoints.RootDir, conf.Archive.RootDir, t.Unix(), conf.Archive.DeleteInstead) + n, err := memorystore.ArchiveCheckpoints(conf.Checkpoints.RootDir, conf.Archive.RootDir, t.Unix(), conf.Archive.DeleteInstead) if err != nil { log.Printf("archiving failed: %s\n", err.Error()) } else { @@ -143,7 +143,8 @@ func main() { startupTime := time.Now() conf = config.LoadConfiguration(configFile) - memoryStore = memstore.NewMemoryStore(conf.Metrics) + memorystore.Init(conf.Metrics) + ms = memorystore.GetMemoryStore() if enableGopsAgent || conf.Debug.EnableGops { if err := agent.Listen(agent.Options{}); err != nil { @@ -167,8 +168,8 @@ func main() { restoreFrom := startupTime.Add(-d) log.Printf("Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339)) - files, err := memoryStore.FromCheckpoint(conf.Checkpoints.RootDir, restoreFrom.Unix()) - loadedData := memoryStore.SizeInBytes() / 1024 / 1024 // In MB + files, err := ms.FromCheckpoint(conf.Checkpoints.RootDir, restoreFrom.Unix()) + loadedData := ms.SizeInBytes() / 1024 / 1024 // In MB if err != nil { log.Fatalf("Loading checkpoints failed: %s\n", err.Error()) } else { @@ -195,7 +196,7 @@ func main() { for { sig := <-sigs if sig == syscall.SIGUSR1 { - memoryStore.DebugDump(bufio.NewWriter(os.Stdout), nil) + ms.DebugDump(bufio.NewWriter(os.Stdout), nil) continue } @@ -223,7 +224,7 @@ func main() { nc := natsConf go func() { // err := ReceiveNats(conf.Nats, decodeLine, runtime.NumCPU()-1, ctx) - err := api.ReceiveNats(nc, decodeLine, 1, ctx) + err := api.ReceiveNats(nc, ms, 1, ctx) if err != nil { log.Fatal(err) } @@ -235,7 +236,7 @@ func main() { wg.Wait() log.Printf("Writing to '%s'...\n", conf.Checkpoints.RootDir) - files, err = memoryStore.ToCheckpoint(conf.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix()) + files, err = ms.ToCheckpoint(conf.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix()) if err != nil { log.Printf("Writing checkpoint failed: %s\n", err.Error()) } diff --git a/internal/api/api.go b/internal/api/api.go index 987f2c6..397390d 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -17,19 +17,22 @@ import ( "sync" "time" + "github.com/ClusterCockpit/cc-metric-store/internal/config" + "github.com/ClusterCockpit/cc-metric-store/internal/memorystore" + "github.com/ClusterCockpit/cc-metric-store/internal/util" "github.com/golang-jwt/jwt/v4" "github.com/gorilla/mux" "github.com/influxdata/line-protocol/v2/lineprotocol" ) type ApiMetricData struct { - Error *string `json:"error,omitempty"` - From int64 `json:"from"` - To int64 `json:"to"` - Data FloatArray `json:"data,omitempty"` - Avg Float `json:"avg"` - Min Float `json:"min"` - Max Float `json:"max"` + Error *string `json:"error,omitempty"` + From int64 `json:"from"` + To int64 `json:"to"` + Data util.FloatArray `json:"data,omitempty"` + Avg util.Float `json:"avg"` + Min util.Float `json:"min"` + Max util.Float `json:"max"` } // TODO: Optimize this, just like the stats endpoint! @@ -49,15 +52,15 @@ func (data *ApiMetricData) AddStats() { if n > 0 { avg := sum / float64(n) - data.Avg = Float(avg) - data.Min = Float(min) - data.Max = Float(max) + data.Avg = util.Float(avg) + data.Min = util.Float(min) + data.Max = util.Float(max) } else { - data.Avg, data.Min, data.Max = NaN, NaN, NaN + data.Avg, data.Min, data.Max = util.NaN, util.NaN, util.NaN } } -func (data *ApiMetricData) ScaleBy(f Float) { +func (data *ApiMetricData) ScaleBy(f util.Float) { if f == 0 || f == 1 { return } @@ -78,9 +81,9 @@ func (data *ApiMetricData) PadDataWithNull(from, to int64, metric string) { if (data.From / minfo.Frequency) > (from / minfo.Frequency) { padfront := int((data.From / minfo.Frequency) - (from / minfo.Frequency)) - ndata := make([]Float, 0, padfront+len(data.Data)) + ndata := make([]util.Float, 0, padfront+len(data.Data)) for i := 0; i < padfront; i++ { - ndata = append(ndata, NaN) + ndata = append(ndata, util.NaN) } for j := 0; j < len(data.Data); j++ { ndata = append(ndata, data.Data[j]) @@ -212,11 +215,13 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) { return } + ms := memorystore.GetMemoryStore() + response := ApiQueryResponse{ Results: make([][]ApiMetricData, 0, len(req.Queries)), } if req.ForAllNodes != nil { - nodes := memoryStore.ListChildren([]string{req.Cluster}) + nodes := ms.ListChildren([]string{req.Cluster}) for _, node := range nodes { for _, metric := range req.ForAllNodes { q := ApiQuery{ @@ -364,7 +369,7 @@ func authentication(next http.Handler, publicKey ed25519.PublicKey) http.Handler }) } -func StartApiServer(ctx context.Context, httpConfig *HttpConfig) error { +func StartApiServer(ctx context.Context, httpConfig *config.HttpConfig) error { r := mux.NewRouter() r.HandleFunc("/api/free", handleFree) diff --git a/internal/api/lineprotocol.go b/internal/api/lineprotocol.go index 9814463..f48f7c3 100644 --- a/internal/api/lineprotocol.go +++ b/internal/api/lineprotocol.go @@ -10,21 +10,17 @@ import ( "time" "github.com/ClusterCockpit/cc-metric-store/internal/config" - "github.com/ClusterCockpit/cc-metric-store/internal/memstore" + "github.com/ClusterCockpit/cc-metric-store/internal/memorystore" "github.com/ClusterCockpit/cc-metric-store/internal/util" "github.com/influxdata/line-protocol/v2/lineprotocol" "github.com/nats-io/nats.go" ) -type Metric struct { - Name string - Value util.Float - mc config.MetricConfig -} - -// Currently unused, could be used to send messages via raw TCP. // Each connection is handled in it's own goroutine. This is a blocking function. -func ReceiveRaw(ctx context.Context, listener net.Listener, handleLine func(*lineprotocol.Decoder, string) error) error { +func ReceiveRaw(ctx context.Context, + listener net.Listener, + handleLine func(*lineprotocol.Decoder, string) error, +) error { var wg sync.WaitGroup wg.Add(1) @@ -86,7 +82,11 @@ func ReceiveRaw(ctx context.Context, listener net.Listener, handleLine func(*lin // Connect to a nats server and subscribe to "updates". This is a blocking // function. handleLine will be called for each line recieved via nats. // Send `true` through the done channel for gracefull termination. -func ReceiveNats(conf *config.NatsConfig, handleLine func(*lineprotocol.Decoder, string) error, workers int, ctx context.Context) error { +func ReceiveNats(conf *config.NatsConfig, + ms *memorystore.MemoryStore, + workers int, + ctx context.Context, +) error { var opts []nats.Option if conf.Username != "" && conf.Password != "" { opts = append(opts, nats.UserInfo(conf.Username, conf.Password)) @@ -113,7 +113,7 @@ func ReceiveNats(conf *config.NatsConfig, handleLine func(*lineprotocol.Decoder, go func() { for m := range msgs { dec := lineprotocol.NewDecoderWithBytes(m.Data) - if err := handleLine(dec, clusterTag); err != nil { + if err := decodeLine(dec, ms, clusterTag); err != nil { log.Printf("error: %s\n", err.Error()) } } @@ -128,7 +128,7 @@ func ReceiveNats(conf *config.NatsConfig, handleLine func(*lineprotocol.Decoder, } else { sub, err = nc.Subscribe(sc.SubscribeTo, func(m *nats.Msg) { dec := lineprotocol.NewDecoderWithBytes(m.Data) - if err := handleLine(dec, clusterTag); err != nil { + if err := decodeLine(dec, ms, clusterTag); err != nil { log.Printf("error: %s\n", err.Error()) } }) @@ -177,17 +177,20 @@ func reorder(buf, prefix []byte) []byte { // Decode lines using dec and make write calls to the MemoryStore. // If a line is missing its cluster tag, use clusterDefault as default. -func decodeLine(dec *lineprotocol.Decoder, memoryStore *memstore.MemoryStore, clusterDefault string) error { +func decodeLine(dec *lineprotocol.Decoder, + ms *memorystore.MemoryStore, + clusterDefault string, +) error { // Reduce allocations in loop: t := time.Now() - metric, metricBuf := Metric{}, make([]byte, 0, 16) + metric, metricBuf := memorystore.Metric{}, make([]byte, 0, 16) selector := make([]string, 0, 4) typeBuf, subTypeBuf := make([]byte, 0, 16), make([]byte, 0) // Optimize for the case where all lines in a "batch" are about the same // cluster and host. By using `WriteToLevel` (level = host), we do not need // to take the root- and cluster-level lock as often. - var lvl *level = nil + var lvl *memorystore.Level = nil var prevCluster, prevHost string = "", "" var ok bool @@ -202,7 +205,7 @@ func decodeLine(dec *lineprotocol.Decoder, memoryStore *memstore.MemoryStore, cl metricBuf = append(metricBuf[:0], rawmeasurement...) // The go compiler optimizes map[string(byteslice)] lookups: - metric.mc, ok = memoryStore.metrics[string(rawmeasurement)] + metric.MetricConfig, ok = ms.Metrics[string(rawmeasurement)] if !ok { continue } @@ -266,7 +269,7 @@ func decodeLine(dec *lineprotocol.Decoder, memoryStore *memstore.MemoryStore, cl if lvl == nil { selector = selector[:2] selector[0], selector[1] = cluster, host - lvl = memoryStore.GetLevel(selector) + lvl = ms.GetLevel(selector) prevCluster, prevHost = cluster, host } @@ -308,7 +311,7 @@ func decodeLine(dec *lineprotocol.Decoder, memoryStore *memstore.MemoryStore, cl return err } - if err := memoryStore.WriteToLevel(lvl, selector, t.Unix(), []Metric{metric}); err != nil { + if err := ms.WriteToLevel(lvl, selector, t.Unix(), []memorystore.Metric{metric}); err != nil { return err } } diff --git a/internal/config/config.go b/internal/config/config.go index 54b64e0..0719d1f 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -43,7 +43,7 @@ type MetricConfig struct { Aggregation AggregationStrategy `json:"aggregation"` // Private, used internally... - offset int + Offset int } type HttpConfig struct { diff --git a/internal/memstore/archive.go b/internal/memorystore/archive.go similarity index 92% rename from internal/memstore/archive.go rename to internal/memorystore/archive.go index e1654f7..a6fe5dc 100644 --- a/internal/memstore/archive.go +++ b/internal/memorystore/archive.go @@ -1,4 +1,4 @@ -package memstore +package memorystore import ( "archive/zip" @@ -75,7 +75,7 @@ func init() { // The good thing: Only a host at a time is locked, so this function can run // in parallel to writes/reads. func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { - levels := make([]*level, 0) + levels := make([]*Level, 0) selectors := make([][]string, 0) m.root.lock.RLock() for sel1, l1 := range m.root.children { @@ -89,7 +89,7 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { m.root.lock.RUnlock() type workItem struct { - level *level + level *Level dir string selector []string } @@ -136,7 +136,7 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { return int(n), nil } -func (l *level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFile, error) { +func (l *Level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFile, error) { l.lock.RLock() defer l.lock.RUnlock() @@ -147,7 +147,7 @@ func (l *level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFil Children: make(map[string]*CheckpointFile), } - for metric, minfo := range m.metrics { + for metric, minfo := range m.Metrics { b := l.metrics[minfo.offset] if b == nil { continue @@ -200,7 +200,7 @@ func (l *level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFil return retval, nil } -func (l *level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { +func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { cf, err := l.toCheckpointFile(from, to, m) if err != nil { return err @@ -211,11 +211,11 @@ func (l *level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { } filepath := path.Join(dir, fmt.Sprintf("%d.json", from)) - f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0644) + f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0o644) if err != nil && os.IsNotExist(err) { - err = os.MkdirAll(dir, 0755) + err = os.MkdirAll(dir, 0o755) if err == nil { - f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0644) + f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0o644) } } if err != nil { @@ -244,7 +244,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { go func() { defer wg.Done() for host := range work { - lvl := m.root.findLevelOrCreate(host[:], len(m.metrics)) + lvl := m.root.findLevelOrCreate(host[:], len(m.Metrics)) nn, err := lvl.fromCheckpoint(filepath.Join(dir, host[0], host[1]), from, m) if err != nil { log.Fatalf("error while loading checkpoints: %s", err.Error()) @@ -302,7 +302,7 @@ done: return int(n), nil } -func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error { +func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error { for name, metric := range cf.Metrics { n := len(metric.Data) b := &buffer{ @@ -315,7 +315,7 @@ func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error { } b.close() - minfo, ok := m.metrics[name] + minfo, ok := m.Metrics[name] if !ok { continue // return errors.New("Unkown metric: " + name) @@ -336,14 +336,14 @@ func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error { } if len(cf.Children) > 0 && l.children == nil { - l.children = make(map[string]*level) + l.children = make(map[string]*Level) } for sel, childCf := range cf.Children { child, ok := l.children[sel] if !ok { - child = &level{ - metrics: make([]*buffer, len(m.metrics)), + child = &Level{ + metrics: make([]*buffer, len(m.Metrics)), children: nil, } l.children[sel] = child @@ -357,7 +357,7 @@ func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error { return nil } -func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, error) { +func (l *Level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, error) { direntries, err := os.ReadDir(dir) if err != nil { if os.IsNotExist(err) { @@ -371,9 +371,9 @@ func (l *level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err filesLoaded := 0 for _, e := range direntries { if e.IsDir() { - child := &level{ - metrics: make([]*buffer, len(m.metrics)), - children: make(map[string]*level), + child := &Level{ + metrics: make([]*buffer, len(m.Metrics)), + children: make(map[string]*Level), } files, err := child.fromCheckpoint(path.Join(dir, e.Name()), from, m) @@ -553,11 +553,11 @@ func archiveCheckpoints(dir string, archiveDir string, from int64, deleteInstead } filename := filepath.Join(archiveDir, fmt.Sprintf("%d.zip", from)) - f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644) + f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0o644) if err != nil && os.IsNotExist(err) { - err = os.MkdirAll(archiveDir, 0755) + err = os.MkdirAll(archiveDir, 0o755) if err == nil { - f, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0644) + f, err = os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0o644) } } if err != nil { diff --git a/internal/memorystore/buffer.go b/internal/memorystore/buffer.go new file mode 100644 index 0000000..397be97 --- /dev/null +++ b/internal/memorystore/buffer.go @@ -0,0 +1,241 @@ +package memorystore + +import ( + "errors" + "sync" + + "github.com/ClusterCockpit/cc-metric-store/internal/util" +) + +// Default buffer capacity. +// `buffer.data` will only ever grow up to it's capacity and a new link +// in the buffer chain will be created if needed so that no copying +// of data or reallocation needs to happen on writes. +const ( + BUFFER_CAP int = 512 +) + +// So that we can reuse allocations +var bufferPool sync.Pool = sync.Pool{ + New: func() interface{} { + return &buffer{ + data: make([]util.Float, 0, BUFFER_CAP), + } + }, +} + +var ( + ErrNoData error = errors.New("no data for this metric/level") + ErrDataDoesNotAlign error = errors.New("data from lower granularities does not align") +) + +// Each metric on each level has it's own buffer. +// This is where the actual values go. +// If `cap(data)` is reached, a new buffer is created and +// becomes the new head of a buffer list. +type buffer struct { + frequency int64 // Time between two "slots" + start int64 // Timestamp of when `data[0]` was written. + data []util.Float // The slice should never reallocacte as `cap(data)` is respected. + prev, next *buffer // `prev` contains older data, `next` newer data. + archived bool // If true, this buffer is already archived + + closed bool + /* + statisticts struct { + samples int + min Float + max Float + avg Float + } + */ +} + +func newBuffer(ts, freq int64) *buffer { + b := bufferPool.Get().(*buffer) + b.frequency = freq + b.start = ts - (freq / 2) + b.prev = nil + b.next = nil + b.archived = false + b.closed = false + b.data = b.data[:0] + return b +} + +// If a new buffer was created, the new head is returnd. +// Otherwise, the existing buffer is returnd. +// Normaly, only "newer" data should be written, but if the value would +// end up in the same buffer anyways it is allowed. +func (b *buffer) write(ts int64, value util.Float) (*buffer, error) { + if ts < b.start { + return nil, errors.New("cannot write value to buffer from past") + } + + // idx := int((ts - b.start + (b.frequency / 3)) / b.frequency) + idx := int((ts - b.start) / b.frequency) + if idx >= cap(b.data) { + newbuf := newBuffer(ts, b.frequency) + newbuf.prev = b + b.next = newbuf + b.close() + b = newbuf + idx = 0 + } + + // Overwriting value or writing value from past + if idx < len(b.data) { + b.data[idx] = value + return b, nil + } + + // Fill up unwritten slots with NaN + for i := len(b.data); i < idx; i++ { + b.data = append(b.data, util.NaN) + } + + b.data = append(b.data, value) + return b, nil +} + +func (b *buffer) end() int64 { + return b.firstWrite() + int64(len(b.data))*b.frequency +} + +func (b *buffer) firstWrite() int64 { + return b.start + (b.frequency / 2) +} + +func (b *buffer) close() {} + +/* +func (b *buffer) close() { + if b.closed { + return + } + + b.closed = true + n, sum, min, max := 0, 0., math.MaxFloat64, -math.MaxFloat64 + for _, x := range b.data { + if x.IsNaN() { + continue + } + + n += 1 + f := float64(x) + sum += f + min = math.Min(min, f) + max = math.Max(max, f) + } + + b.statisticts.samples = n + if n > 0 { + b.statisticts.avg = Float(sum / float64(n)) + b.statisticts.min = Float(min) + b.statisticts.max = Float(max) + } else { + b.statisticts.avg = NaN + b.statisticts.min = NaN + b.statisticts.max = NaN + } +} +*/ + +// func interpolate(idx int, data []Float) Float { +// if idx == 0 || idx+1 == len(data) { +// return NaN +// } +// return (data[idx-1] + data[idx+1]) / 2.0 +// } + +// Return all known values from `from` to `to`. Gaps of information are represented as NaN. +// Simple linear interpolation is done between the two neighboring cells if possible. +// If values at the start or end are missing, instead of NaN values, the second and thrid +// return values contain the actual `from`/`to`. +// This function goes back the buffer chain if `from` is older than the currents buffer start. +// The loaded values are added to `data` and `data` is returned, possibly with a shorter length. +// If `data` is not long enough to hold all values, this function will panic! +func (b *buffer) read(from, to int64, data []util.Float) ([]util.Float, int64, int64, error) { + if from < b.firstWrite() { + if b.prev != nil { + return b.prev.read(from, to, data) + } + from = b.firstWrite() + } + + var i int = 0 + var t int64 = from + for ; t < to; t += b.frequency { + idx := int((t - b.start) / b.frequency) + if idx >= cap(b.data) { + if b.next == nil { + break + } + b = b.next + idx = 0 + } + + if idx >= len(b.data) { + if b.next == nil || to <= b.next.start { + break + } + data[i] += util.NaN + } else if t < b.start { + data[i] += util.NaN + // } else if b.data[idx].IsNaN() { + // data[i] += interpolate(idx, b.data) + } else { + data[i] += b.data[idx] + } + i++ + } + + return data[:i], from, t, nil +} + +// Returns true if this buffer needs to be freed. +func (b *buffer) free(t int64) (delme bool, n int) { + if b.prev != nil { + delme, m := b.prev.free(t) + n += m + if delme { + b.prev.next = nil + if cap(b.prev.data) == BUFFER_CAP { + bufferPool.Put(b.prev) + } + b.prev = nil + } + } + + end := b.end() + if end < t { + return true, n + 1 + } + + return false, n +} + +// Call `callback` on every buffer that contains data in the range from `from` to `to`. +func (b *buffer) iterFromTo(from, to int64, callback func(b *buffer) error) error { + if b == nil { + return nil + } + + if err := b.prev.iterFromTo(from, to, callback); err != nil { + return err + } + + if from <= b.end() && b.start <= to { + return callback(b) + } + + return nil +} + +func (b *buffer) count() int64 { + res := int64(len(b.data)) + if b.prev != nil { + res += b.prev.count() + } + return res +} diff --git a/debug.go b/internal/memorystore/debug.go similarity index 94% rename from debug.go rename to internal/memorystore/debug.go index 88af59f..59a978b 100644 --- a/debug.go +++ b/internal/memorystore/debug.go @@ -1,4 +1,4 @@ -package main +package memorystore import ( "bufio" @@ -29,7 +29,7 @@ func (b *buffer) debugDump(buf []byte) []byte { return buf } -func (l *level) debugDump(m *MemoryStore, w *bufio.Writer, lvlname string, buf []byte, depth int) ([]byte, error) { +func (l *Level) debugDump(m *MemoryStore, w *bufio.Writer, lvlname string, buf []byte, depth int) ([]byte, error) { l.lock.RLock() defer l.lock.RUnlock() for i := 0; i < depth; i++ { @@ -40,7 +40,7 @@ func (l *level) debugDump(m *MemoryStore, w *bufio.Writer, lvlname string, buf [ buf = append(buf, "\":{\n"...) depth += 1 objitems := 0 - for name, mc := range m.metrics { + for name, mc := range m.Metrics { if b := l.metrics[mc.offset]; b != nil { for i := 0; i < depth; i++ { buf = append(buf, '\t') diff --git a/internal/memorystore/level.go b/internal/memorystore/level.go new file mode 100644 index 0000000..34a58a2 --- /dev/null +++ b/internal/memorystore/level.go @@ -0,0 +1,183 @@ +package memorystore + +import ( + "sync" + "unsafe" + + "github.com/ClusterCockpit/cc-metric-store/internal/util" +) + +// Could also be called "node" as this forms a node in a tree structure. +// Called Level because "node" might be confusing here. +// Can be both a leaf or a inner node. In this tree structue, inner nodes can +// also hold data (in `metrics`). +type Level struct { + lock sync.RWMutex + metrics []*buffer // Every level can store metrics. + children map[string]*Level // Lower levels. +} + +// Find the correct level for the given selector, creating it if +// it does not exist. Example selector in the context of the +// ClusterCockpit could be: []string{ "emmy", "host123", "cpu0" }. +// This function would probably benefit a lot from `level.children` beeing a `sync.Map`? +func (l *Level) findLevelOrCreate(selector []string, nMetrics int) *Level { + if len(selector) == 0 { + return l + } + + // Allow concurrent reads: + l.lock.RLock() + var child *Level + var ok bool + if l.children == nil { + // Children map needs to be created... + l.lock.RUnlock() + } else { + child, ok := l.children[selector[0]] + l.lock.RUnlock() + if ok { + return child.findLevelOrCreate(selector[1:], nMetrics) + } + } + + // The level does not exist, take write lock for unqiue access: + l.lock.Lock() + // While this thread waited for the write lock, another thread + // could have created the child node. + if l.children != nil { + child, ok = l.children[selector[0]] + if ok { + l.lock.Unlock() + return child.findLevelOrCreate(selector[1:], nMetrics) + } + } + + child = &Level{ + metrics: make([]*buffer, nMetrics), + children: nil, + } + + if l.children != nil { + l.children[selector[0]] = child + } else { + l.children = map[string]*Level{selector[0]: child} + } + l.lock.Unlock() + return child.findLevelOrCreate(selector[1:], nMetrics) +} + +func (l *Level) free(t int64) (int, error) { + l.lock.Lock() + defer l.lock.Unlock() + + n := 0 + for i, b := range l.metrics { + if b != nil { + delme, m := b.free(t) + n += m + if delme { + if cap(b.data) == BUFFER_CAP { + bufferPool.Put(b) + } + l.metrics[i] = nil + } + } + } + + for _, l := range l.children { + m, err := l.free(t) + n += m + if err != nil { + return n, err + } + } + + return n, nil +} + +func (l *Level) sizeInBytes() int64 { + l.lock.RLock() + defer l.lock.RUnlock() + size := int64(0) + + for _, b := range l.metrics { + if b != nil { + size += b.count() * int64(unsafe.Sizeof(util.Float(0))) + } + } + + return size +} + +func (l *Level) findLevel(selector []string) *Level { + if len(selector) == 0 { + return l + } + + l.lock.RLock() + defer l.lock.RUnlock() + + lvl := l.children[selector[0]] + if lvl == nil { + return nil + } + + return lvl.findLevel(selector[1:]) +} + +func (l *Level) findBuffers(selector Selector, offset int, f func(b *buffer) error) error { + l.lock.RLock() + defer l.lock.RUnlock() + + if len(selector) == 0 { + b := l.metrics[offset] + if b != nil { + return f(b) + } + + for _, lvl := range l.children { + err := lvl.findBuffers(nil, offset, f) + if err != nil { + return err + } + } + return nil + } + + sel := selector[0] + if len(sel.String) != 0 && l.children != nil { + lvl, ok := l.children[sel.String] + if ok { + err := lvl.findBuffers(selector[1:], offset, f) + if err != nil { + return err + } + } + return nil + } + + if sel.Group != nil && l.children != nil { + for _, key := range sel.Group { + lvl, ok := l.children[key] + if ok { + err := lvl.findBuffers(selector[1:], offset, f) + if err != nil { + return err + } + } + } + return nil + } + + if sel.Any && l.children != nil { + for _, lvl := range l.children { + if err := lvl.findBuffers(selector[1:], offset, f); err != nil { + return err + } + } + return nil + } + + return nil +} diff --git a/internal/memorystore/memorystore.go b/internal/memorystore/memorystore.go new file mode 100644 index 0000000..305ebdd --- /dev/null +++ b/internal/memorystore/memorystore.go @@ -0,0 +1,222 @@ +package memorystore + +import ( + "errors" + "log" + "sync" + + "github.com/ClusterCockpit/cc-metric-store/internal/config" + "github.com/ClusterCockpit/cc-metric-store/internal/util" +) + +var ( + singleton sync.Once + msInstance *MemoryStore +) + +type Metric struct { + Name string + Value util.Float + MetricConfig config.MetricConfig +} + +type MemoryStore struct { + root Level // root of the tree structure + Metrics map[string]config.MetricConfig +} + +// Create a new, initialized instance of a MemoryStore. +// Will panic if values in the metric configurations are invalid. +func Init(metrics map[string]config.MetricConfig) { + singleton.Do(func() { + offset := 0 + for key, cfg := range metrics { + if cfg.Frequency == 0 { + panic("invalid frequency") + } + + metrics[key] = config.MetricConfig{ + Frequency: cfg.Frequency, + Aggregation: cfg.Aggregation, + Offset: offset, + } + offset += 1 + } + + msInstance = &MemoryStore{ + root: Level{ + metrics: make([]*buffer, len(metrics)), + children: make(map[string]*Level), + }, + Metrics: metrics, + } + }) +} + +func GetMemoryStore() *MemoryStore { + if msInstance == nil { + log.Fatalf("MemoryStore not initialized!") + } + + return msInstance +} + +// Write all values in `metrics` to the level specified by `selector` for time `ts`. +// Look at `findLevelOrCreate` for how selectors work. +func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error { + var ok bool + for i, metric := range metrics { + if metric.MetricConfig.Frequency == 0 { + metric.MetricConfig, ok = m.Metrics[metric.Name] + if !ok { + metric.MetricConfig.Frequency = 0 + } + metrics[i] = metric + } + } + + return m.WriteToLevel(&m.root, selector, ts, metrics) +} + +func (m *MemoryStore) GetLevel(selector []string) *Level { + return m.root.findLevelOrCreate(selector, len(m.Metrics)) +} + +// Assumes that `minfo` in `metrics` is filled in! +func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metrics []Metric) error { + l = l.findLevelOrCreate(selector, len(m.Metrics)) + l.lock.Lock() + defer l.lock.Unlock() + + for _, metric := range metrics { + if metric.MetricConfig.Frequency == 0 { + continue + } + + b := l.metrics[metric.MetricConfig.Offset] + if b == nil { + // First write to this metric and level + b = newBuffer(ts, metric.MetricConfig.Frequency) + l.metrics[metric.MetricConfig.Offset] = b + } + + nb, err := b.write(ts, metric.Value) + if err != nil { + return err + } + + // Last write created a new buffer... + if b != nb { + l.metrics[metric.MetricConfig.Offset] = nb + } + } + return nil +} + +// Returns all values for metric `metric` from `from` to `to` for the selected level(s). +// If the level does not hold the metric itself, the data will be aggregated recursively from the children. +// The second and third return value are the actual from/to for the data. Those can be different from +// the range asked for if no data was available. +func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([]util.Float, int64, int64, error) { + if from > to { + return nil, 0, 0, errors.New("invalid time range") + } + + minfo, ok := m.Metrics[metric] + if !ok { + return nil, 0, 0, errors.New("unkown metric: " + metric) + } + + n, data := 0, make([]util.Float, (to-from)/minfo.Frequency+1) + err := m.root.findBuffers(selector, minfo.Offset, func(b *buffer) error { + cdata, cfrom, cto, err := b.read(from, to, data) + if err != nil { + return err + } + + if n == 0 { + from, to = cfrom, cto + } else if from != cfrom || to != cto || len(data) != len(cdata) { + missingfront, missingback := int((from-cfrom)/minfo.Frequency), int((to-cto)/minfo.Frequency) + if missingfront != 0 { + return ErrDataDoesNotAlign + } + + newlen := len(cdata) - missingback + if newlen < 1 { + return ErrDataDoesNotAlign + } + cdata = cdata[0:newlen] + if len(cdata) != len(data) { + return ErrDataDoesNotAlign + } + + from, to = cfrom, cto + } + + data = cdata + n += 1 + return nil + }) + + if err != nil { + return nil, 0, 0, err + } else if n == 0 { + return nil, 0, 0, errors.New("metric or host not found") + } else if n > 1 { + if minfo.Aggregation == config.AvgAggregation { + normalize := 1. / util.Float(n) + for i := 0; i < len(data); i++ { + data[i] *= normalize + } + } else if minfo.Aggregation != config.SumAggregation { + return nil, 0, 0, errors.New("invalid aggregation") + } + } + + return data, from, to, nil +} + +// Release all buffers for the selected level and all its children that contain only +// values older than `t`. +func (m *MemoryStore) Free(selector []string, t int64) (int, error) { + return m.GetLevel(selector).free(t) +} + +func (m *MemoryStore) FreeAll() error { + for k := range m.root.children { + delete(m.root.children, k) + } + + return nil +} + +func (m *MemoryStore) SizeInBytes() int64 { + return m.root.sizeInBytes() +} + +// Given a selector, return a list of all children of the level selected. +func (m *MemoryStore) ListChildren(selector []string) []string { + lvl := &m.root + for lvl != nil && len(selector) != 0 { + lvl.lock.RLock() + next := lvl.children[selector[0]] + lvl.lock.RUnlock() + lvl = next + selector = selector[1:] + } + + if lvl == nil { + return nil + } + + lvl.lock.RLock() + defer lvl.lock.RUnlock() + + children := make([]string, 0, len(lvl.children)) + for child := range lvl.children { + children = append(children, child) + } + + return children +} diff --git a/internal/memorystore/selector.go b/internal/memorystore/selector.go new file mode 100644 index 0000000..0b24300 --- /dev/null +++ b/internal/memorystore/selector.go @@ -0,0 +1,51 @@ +package memorystore + +import ( + "encoding/json" + "errors" +) + +type SelectorElement struct { + Any bool + String string + Group []string +} + +func (se *SelectorElement) UnmarshalJSON(input []byte) error { + if input[0] == '"' { + if err := json.Unmarshal(input, &se.String); err != nil { + return err + } + + if se.String == "*" { + se.Any = true + se.String = "" + } + + return nil + } + + if input[0] == '[' { + return json.Unmarshal(input, &se.Group) + } + + return errors.New("the Go SelectorElement type can only be a string or an array of strings") +} + +func (se *SelectorElement) MarshalJSON() ([]byte, error) { + if se.Any { + return []byte("\"*\""), nil + } + + if se.String != "" { + return json.Marshal(se.String) + } + + if se.Group != nil { + return json.Marshal(se.Group) + } + + return nil, errors.New("a Go Selector must be a non-empty string or a non-empty slice of strings") +} + +type Selector []SelectorElement diff --git a/internal/util/stats.go b/internal/memorystore/stats.go similarity index 75% rename from internal/util/stats.go rename to internal/memorystore/stats.go index 8e0f41f..3240d02 100644 --- a/internal/util/stats.go +++ b/internal/memorystore/stats.go @@ -1,15 +1,18 @@ -package util +package memorystore import ( "errors" "math" + + "github.com/ClusterCockpit/cc-metric-store/internal/config" + "github.com/ClusterCockpit/cc-metric-store/internal/util" ) type Stats struct { Samples int - Avg Float - Min Float - Max Float + Avg util.Float + Min util.Float + Max util.Float } func (b *buffer) stats(from, to int64) (Stats, int64, int64, error) { @@ -54,9 +57,9 @@ func (b *buffer) stats(from, to int64) (Stats, int64, int64, error) { return Stats{ Samples: samples, - Avg: Float(sum) / Float(samples), - Min: Float(min), - Max: Float(max), + Avg: util.Float(sum) / util.Float(samples), + Min: util.Float(min), + Max: util.Float(max), }, from, t, nil } @@ -68,14 +71,14 @@ func (m *MemoryStore) Stats(selector Selector, metric string, from, to int64) (* return nil, 0, 0, errors.New("invalid time range") } - minfo, ok := m.metrics[metric] + minfo, ok := m.Metrics[metric] if !ok { return nil, 0, 0, errors.New("unkown metric: " + metric) } n, samples := 0, 0 - avg, min, max := Float(0), math.MaxFloat32, -math.MaxFloat32 - err := m.root.findBuffers(selector, minfo.offset, func(b *buffer) error { + avg, min, max := util.Float(0), math.MaxFloat32, -math.MaxFloat32 + err := m.root.findBuffers(selector, minfo.Offset, func(b *buffer) error { stats, cfrom, cto, err := b.stats(from, to) if err != nil { return err @@ -102,16 +105,16 @@ func (m *MemoryStore) Stats(selector Selector, metric string, from, to int64) (* return nil, 0, 0, ErrNoData } - if minfo.Aggregation == AvgAggregation { - avg /= Float(n) - } else if n > 1 && minfo.Aggregation != SumAggregation { + if minfo.Aggregation == config.AvgAggregation { + avg /= util.Float(n) + } else if n > 1 && minfo.Aggregation != config.SumAggregation { return nil, 0, 0, errors.New("invalid aggregation") } return &Stats{ Samples: samples, Avg: avg, - Min: Float(min), - Max: Float(max), + Min: util.Float(min), + Max: util.Float(max), }, from, to, nil } diff --git a/internal/memstore/memstore.go b/internal/memstore/memstore.go deleted file mode 100644 index 99b6295..0000000 --- a/internal/memstore/memstore.go +++ /dev/null @@ -1,542 +0,0 @@ -package memstore - -import ( - "errors" - "sync" - "unsafe" - - "github.com/ClusterCockpit/cc-metric-store/internal/api" - "github.com/ClusterCockpit/cc-metric-store/internal/config" - "github.com/ClusterCockpit/cc-metric-store/internal/util" -) - -// Default buffer capacity. -// `buffer.data` will only ever grow up to it's capacity and a new link -// in the buffer chain will be created if needed so that no copying -// of data or reallocation needs to happen on writes. -const ( - BUFFER_CAP int = 512 -) - -// So that we can reuse allocations -var bufferPool sync.Pool = sync.Pool{ - New: func() interface{} { - return &buffer{ - data: make([]util.Float, 0, BUFFER_CAP), - } - }, -} - -var ( - ErrNoData error = errors.New("no data for this metric/level") - ErrDataDoesNotAlign error = errors.New("data from lower granularities does not align") -) - -// Each metric on each level has it's own buffer. -// This is where the actual values go. -// If `cap(data)` is reached, a new buffer is created and -// becomes the new head of a buffer list. -type buffer struct { - frequency int64 // Time between two "slots" - start int64 // Timestamp of when `data[0]` was written. - data []util.Float // The slice should never reallocacte as `cap(data)` is respected. - prev, next *buffer // `prev` contains older data, `next` newer data. - archived bool // If true, this buffer is already archived - - closed bool - /* - statisticts struct { - samples int - min Float - max Float - avg Float - } - */ -} - -func newBuffer(ts, freq int64) *buffer { - b := bufferPool.Get().(*buffer) - b.frequency = freq - b.start = ts - (freq / 2) - b.prev = nil - b.next = nil - b.archived = false - b.closed = false - b.data = b.data[:0] - return b -} - -// If a new buffer was created, the new head is returnd. -// Otherwise, the existing buffer is returnd. -// Normaly, only "newer" data should be written, but if the value would -// end up in the same buffer anyways it is allowed. -func (b *buffer) write(ts int64, value util.Float) (*buffer, error) { - if ts < b.start { - return nil, errors.New("cannot write value to buffer from past") - } - - // idx := int((ts - b.start + (b.frequency / 3)) / b.frequency) - idx := int((ts - b.start) / b.frequency) - if idx >= cap(b.data) { - newbuf := newBuffer(ts, b.frequency) - newbuf.prev = b - b.next = newbuf - b.close() - b = newbuf - idx = 0 - } - - // Overwriting value or writing value from past - if idx < len(b.data) { - b.data[idx] = value - return b, nil - } - - // Fill up unwritten slots with NaN - for i := len(b.data); i < idx; i++ { - b.data = append(b.data, util.NaN) - } - - b.data = append(b.data, value) - return b, nil -} - -func (b *buffer) end() int64 { - return b.firstWrite() + int64(len(b.data))*b.frequency -} - -func (b *buffer) firstWrite() int64 { - return b.start + (b.frequency / 2) -} - -func (b *buffer) close() {} - -/* -func (b *buffer) close() { - if b.closed { - return - } - - b.closed = true - n, sum, min, max := 0, 0., math.MaxFloat64, -math.MaxFloat64 - for _, x := range b.data { - if x.IsNaN() { - continue - } - - n += 1 - f := float64(x) - sum += f - min = math.Min(min, f) - max = math.Max(max, f) - } - - b.statisticts.samples = n - if n > 0 { - b.statisticts.avg = Float(sum / float64(n)) - b.statisticts.min = Float(min) - b.statisticts.max = Float(max) - } else { - b.statisticts.avg = NaN - b.statisticts.min = NaN - b.statisticts.max = NaN - } -} -*/ - -// func interpolate(idx int, data []Float) Float { -// if idx == 0 || idx+1 == len(data) { -// return NaN -// } -// return (data[idx-1] + data[idx+1]) / 2.0 -// } - -// Return all known values from `from` to `to`. Gaps of information are represented as NaN. -// Simple linear interpolation is done between the two neighboring cells if possible. -// If values at the start or end are missing, instead of NaN values, the second and thrid -// return values contain the actual `from`/`to`. -// This function goes back the buffer chain if `from` is older than the currents buffer start. -// The loaded values are added to `data` and `data` is returned, possibly with a shorter length. -// If `data` is not long enough to hold all values, this function will panic! -func (b *buffer) read(from, to int64, data []util.Float) ([]util.Float, int64, int64, error) { - if from < b.firstWrite() { - if b.prev != nil { - return b.prev.read(from, to, data) - } - from = b.firstWrite() - } - - var i int = 0 - var t int64 = from - for ; t < to; t += b.frequency { - idx := int((t - b.start) / b.frequency) - if idx >= cap(b.data) { - if b.next == nil { - break - } - b = b.next - idx = 0 - } - - if idx >= len(b.data) { - if b.next == nil || to <= b.next.start { - break - } - data[i] += util.NaN - } else if t < b.start { - data[i] += util.NaN - // } else if b.data[idx].IsNaN() { - // data[i] += interpolate(idx, b.data) - } else { - data[i] += b.data[idx] - } - i++ - } - - return data[:i], from, t, nil -} - -// Returns true if this buffer needs to be freed. -func (b *buffer) free(t int64) (delme bool, n int) { - if b.prev != nil { - delme, m := b.prev.free(t) - n += m - if delme { - b.prev.next = nil - if cap(b.prev.data) == BUFFER_CAP { - bufferPool.Put(b.prev) - } - b.prev = nil - } - } - - end := b.end() - if end < t { - return true, n + 1 - } - - return false, n -} - -// Call `callback` on every buffer that contains data in the range from `from` to `to`. -func (b *buffer) iterFromTo(from, to int64, callback func(b *buffer) error) error { - if b == nil { - return nil - } - - if err := b.prev.iterFromTo(from, to, callback); err != nil { - return err - } - - if from <= b.end() && b.start <= to { - return callback(b) - } - - return nil -} - -func (b *buffer) count() int64 { - res := int64(len(b.data)) - if b.prev != nil { - res += b.prev.count() - } - return res -} - -// Could also be called "node" as this forms a node in a tree structure. -// Called level because "node" might be confusing here. -// Can be both a leaf or a inner node. In this tree structue, inner nodes can -// also hold data (in `metrics`). -type level struct { - lock sync.RWMutex - metrics []*buffer // Every level can store metrics. - children map[string]*level // Lower levels. -} - -// Find the correct level for the given selector, creating it if -// it does not exist. Example selector in the context of the -// ClusterCockpit could be: []string{ "emmy", "host123", "cpu0" }. -// This function would probably benefit a lot from `level.children` beeing a `sync.Map`? -func (l *level) findLevelOrCreate(selector []string, nMetrics int) *level { - if len(selector) == 0 { - return l - } - - // Allow concurrent reads: - l.lock.RLock() - var child *level - var ok bool - if l.children == nil { - // Children map needs to be created... - l.lock.RUnlock() - } else { - child, ok := l.children[selector[0]] - l.lock.RUnlock() - if ok { - return child.findLevelOrCreate(selector[1:], nMetrics) - } - } - - // The level does not exist, take write lock for unqiue access: - l.lock.Lock() - // While this thread waited for the write lock, another thread - // could have created the child node. - if l.children != nil { - child, ok = l.children[selector[0]] - if ok { - l.lock.Unlock() - return child.findLevelOrCreate(selector[1:], nMetrics) - } - } - - child = &level{ - metrics: make([]*buffer, nMetrics), - children: nil, - } - - if l.children != nil { - l.children[selector[0]] = child - } else { - l.children = map[string]*level{selector[0]: child} - } - l.lock.Unlock() - return child.findLevelOrCreate(selector[1:], nMetrics) -} - -func (l *level) free(t int64) (int, error) { - l.lock.Lock() - defer l.lock.Unlock() - - n := 0 - for i, b := range l.metrics { - if b != nil { - delme, m := b.free(t) - n += m - if delme { - if cap(b.data) == BUFFER_CAP { - bufferPool.Put(b) - } - l.metrics[i] = nil - } - } - } - - for _, l := range l.children { - m, err := l.free(t) - n += m - if err != nil { - return n, err - } - } - - return n, nil -} - -func (l *level) sizeInBytes() int64 { - l.lock.RLock() - defer l.lock.RUnlock() - size := int64(0) - - for _, b := range l.metrics { - if b != nil { - size += b.count() * int64(unsafe.Sizeof(util.Float(0))) - } - } - - for _, child := range l.children { - size += child.sizeInBytes() - } - - return size -} - -type MemoryStore struct { - root level // root of the tree structure - metrics map[string]config.MetricConfig -} - -// Return a new, initialized instance of a MemoryStore. -// Will panic if values in the metric configurations are invalid. -func NewMemoryStore(metrics map[string]config.MetricConfig) *MemoryStore { - offset := 0 - for key, config := range metrics { - if config.Frequency == 0 { - panic("invalid frequency") - } - - metrics[key] = MetricConfig{ - Frequency: config.Frequency, - Aggregation: config.Aggregation, - offset: offset, - } - offset += 1 - } - - return &MemoryStore{ - root: level{ - metrics: make([]*buffer, len(metrics)), - children: make(map[string]*level), - }, - metrics: metrics, - } -} - -// Write all values in `metrics` to the level specified by `selector` for time `ts`. -// Look at `findLevelOrCreate` for how selectors work. -func (m *MemoryStore) Write(selector []string, ts int64, metrics []api.Metric) error { - var ok bool - for i, metric := range metrics { - if metric.mc.Frequency == 0 { - metric.mc, ok = m.metrics[metric.Name] - if !ok { - metric.mc.Frequency = 0 - } - metrics[i] = metric - } - } - - return m.WriteToLevel(&m.root, selector, ts, metrics) -} - -func (m *MemoryStore) GetLevel(selector []string) *level { - return m.root.findLevelOrCreate(selector, len(m.metrics)) -} - -// Assumes that `minfo` in `metrics` is filled in! -func (m *MemoryStore) WriteToLevel(l *level, selector []string, ts int64, metrics []api.Metric) error { - l = l.findLevelOrCreate(selector, len(m.metrics)) - l.lock.Lock() - defer l.lock.Unlock() - - for _, metric := range metrics { - if metric.mc.Frequency == 0 { - continue - } - - b := l.metrics[metric.mc.offset] - if b == nil { - // First write to this metric and level - b = newBuffer(ts, metric.mc.Frequency) - l.metrics[metric.mc.offset] = b - } - - nb, err := b.write(ts, metric.Value) - if err != nil { - return err - } - - // Last write created a new buffer... - if b != nb { - l.metrics[metric.mc.offset] = nb - } - } - return nil -} - -// Returns all values for metric `metric` from `from` to `to` for the selected level(s). -// If the level does not hold the metric itself, the data will be aggregated recursively from the children. -// The second and third return value are the actual from/to for the data. Those can be different from -// the range asked for if no data was available. -func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([]Float, int64, int64, error) { - if from > to { - return nil, 0, 0, errors.New("invalid time range") - } - - minfo, ok := m.metrics[metric] - if !ok { - return nil, 0, 0, errors.New("unkown metric: " + metric) - } - - n, data := 0, make([]Float, (to-from)/minfo.Frequency+1) - err := m.root.findBuffers(selector, minfo.offset, func(b *buffer) error { - cdata, cfrom, cto, err := b.read(from, to, data) - if err != nil { - return err - } - - if n == 0 { - from, to = cfrom, cto - } else if from != cfrom || to != cto || len(data) != len(cdata) { - missingfront, missingback := int((from-cfrom)/minfo.Frequency), int((to-cto)/minfo.Frequency) - if missingfront != 0 { - return ErrDataDoesNotAlign - } - - newlen := len(cdata) - missingback - if newlen < 1 { - return ErrDataDoesNotAlign - } - cdata = cdata[0:newlen] - if len(cdata) != len(data) { - return ErrDataDoesNotAlign - } - - from, to = cfrom, cto - } - - data = cdata - n += 1 - return nil - }) - - if err != nil { - return nil, 0, 0, err - } else if n == 0 { - return nil, 0, 0, errors.New("metric or host not found") - } else if n > 1 { - if minfo.Aggregation == AvgAggregation { - normalize := 1. / Float(n) - for i := 0; i < len(data); i++ { - data[i] *= normalize - } - } else if minfo.Aggregation != SumAggregation { - return nil, 0, 0, errors.New("invalid aggregation") - } - } - - return data, from, to, nil -} - -// Release all buffers for the selected level and all its children that contain only -// values older than `t`. -func (m *MemoryStore) Free(selector []string, t int64) (int, error) { - return m.GetLevel(selector).free(t) -} - -func (m *MemoryStore) FreeAll() error { - for k := range m.root.children { - delete(m.root.children, k) - } - - return nil -} - -func (m *MemoryStore) SizeInBytes() int64 { - return m.root.sizeInBytes() -} - -// Given a selector, return a list of all children of the level selected. -func (m *MemoryStore) ListChildren(selector []string) []string { - lvl := &m.root - for lvl != nil && len(selector) != 0 { - lvl.lock.RLock() - next := lvl.children[selector[0]] - lvl.lock.RUnlock() - lvl = next - selector = selector[1:] - } - - if lvl == nil { - return nil - } - - lvl.lock.RLock() - defer lvl.lock.RUnlock() - - children := make([]string, 0, len(lvl.children)) - for child := range lvl.children { - children = append(children, child) - } - - return children -} diff --git a/internal/memstore/selector.go b/internal/memstore/selector.go deleted file mode 100644 index 7bc498a..0000000 --- a/internal/memstore/selector.go +++ /dev/null @@ -1,123 +0,0 @@ -package memstore - -import ( - "encoding/json" - "errors" -) - -type SelectorElement struct { - Any bool - String string - Group []string -} - -func (se *SelectorElement) UnmarshalJSON(input []byte) error { - if input[0] == '"' { - if err := json.Unmarshal(input, &se.String); err != nil { - return err - } - - if se.String == "*" { - se.Any = true - se.String = "" - } - - return nil - } - - if input[0] == '[' { - return json.Unmarshal(input, &se.Group) - } - - return errors.New("the Go SelectorElement type can only be a string or an array of strings") -} - -func (se *SelectorElement) MarshalJSON() ([]byte, error) { - if se.Any { - return []byte("\"*\""), nil - } - - if se.String != "" { - return json.Marshal(se.String) - } - - if se.Group != nil { - return json.Marshal(se.Group) - } - - return nil, errors.New("a Go Selector must be a non-empty string or a non-empty slice of strings") -} - -type Selector []SelectorElement - -func (l *level) findLevel(selector []string) *level { - if len(selector) == 0 { - return l - } - - l.lock.RLock() - defer l.lock.RUnlock() - - lvl := l.children[selector[0]] - if lvl == nil { - return nil - } - - return lvl.findLevel(selector[1:]) -} - -func (l *level) findBuffers(selector Selector, offset int, f func(b *buffer) error) error { - l.lock.RLock() - defer l.lock.RUnlock() - - if len(selector) == 0 { - b := l.metrics[offset] - if b != nil { - return f(b) - } - - for _, lvl := range l.children { - err := lvl.findBuffers(nil, offset, f) - if err != nil { - return err - } - } - return nil - } - - sel := selector[0] - if len(sel.String) != 0 && l.children != nil { - lvl, ok := l.children[sel.String] - if ok { - err := lvl.findBuffers(selector[1:], offset, f) - if err != nil { - return err - } - } - return nil - } - - if sel.Group != nil && l.children != nil { - for _, key := range sel.Group { - lvl, ok := l.children[key] - if ok { - err := lvl.findBuffers(selector[1:], offset, f) - if err != nil { - return err - } - } - } - return nil - } - - if sel.Any && l.children != nil { - for _, lvl := range l.children { - if err := lvl.findBuffers(selector[1:], offset, f); err != nil { - return err - } - } - return nil - } - - return nil -}