From fcc8eac2d56284989736d2630f746c80f175b528 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 6 May 2024 14:20:43 +0200 Subject: [PATCH] Restructure and Cleanup Compiles --- cmd/cc-metric-store/main.go | 162 +------ internal/api/api.go | 117 +++-- internal/api/lineprotocol.go | 2 +- internal/config/config.go | 8 +- internal/memorystore/archive.go | 481 ++------------------ internal/memorystore/buffer.go | 26 +- internal/memorystore/checkpoint.go | 501 +++++++++++++++++++++ internal/memorystore/debug.go | 2 +- internal/memorystore/level.go | 6 +- internal/memorystore/memorystore.go | 65 ++- internal/memorystore/stats.go | 2 +- internal/{memorystore => util}/selector.go | 4 +- 12 files changed, 686 insertions(+), 690 deletions(-) create mode 100644 internal/memorystore/checkpoint.go rename internal/{memorystore => util}/selector.go (97%) diff --git a/cmd/cc-metric-store/main.go b/cmd/cc-metric-store/main.go index 601dfe2..c7f3c03 100644 --- a/cmd/cc-metric-store/main.go +++ b/cmd/cc-metric-store/main.go @@ -4,7 +4,6 @@ import ( "bufio" "context" "flag" - "io" "log" "os" "os/signal" @@ -20,120 +19,6 @@ import ( "github.com/google/gops/agent" ) -var ( - conf config.Config - ms *memorystore.MemoryStore = nil - lastCheckpoint time.Time -) - -var ( - debugDumpLock sync.Mutex - debugDump io.Writer = io.Discard -) - -func intervals(wg *sync.WaitGroup, ctx context.Context) { - wg.Add(3) - // go func() { - // defer wg.Done() - // ticks := time.Tick(30 * time.Minute) - // for { - // select { - // case <-ctx.Done(): - // return - // case <-ticks: - // runtime.GC() - // } - // } - // }() - - go func() { - defer wg.Done() - d, err := time.ParseDuration(conf.RetentionInMemory) - if err != nil { - log.Fatal(err) - } - if d <= 0 { - return - } - - ticks := time.Tick(d / 2) - for { - select { - case <-ctx.Done(): - return - case <-ticks: - t := time.Now().Add(-d) - log.Printf("start freeing buffers (older than %s)...\n", t.Format(time.RFC3339)) - freed, err := ms.Free(nil, t.Unix()) - if err != nil { - log.Printf("freeing up buffers failed: %s\n", err.Error()) - } else { - log.Printf("done: %d buffers freed\n", freed) - } - } - } - }() - - lastCheckpoint = time.Now() - go func() { - defer wg.Done() - d, err := time.ParseDuration(conf.Checkpoints.Interval) - if err != nil { - log.Fatal(err) - } - if d <= 0 { - return - } - - ticks := time.Tick(d) - for { - select { - case <-ctx.Done(): - return - case <-ticks: - log.Printf("start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339)) - now := time.Now() - n, err := ms.ToCheckpoint(conf.Checkpoints.RootDir, - lastCheckpoint.Unix(), now.Unix()) - if err != nil { - log.Printf("checkpointing failed: %s\n", err.Error()) - } else { - log.Printf("done: %d checkpoint files created\n", n) - lastCheckpoint = now - } - } - } - }() - - go func() { - defer wg.Done() - d, err := time.ParseDuration(conf.Archive.Interval) - if err != nil { - log.Fatal(err) - } - if d <= 0 { - return - } - - ticks := time.Tick(d) - for { - select { - case <-ctx.Done(): - return - case <-ticks: - t := time.Now().Add(-d) - log.Printf("start archiving checkpoints (older than %s)...\n", t.Format(time.RFC3339)) - n, err := memorystore.ArchiveCheckpoints(conf.Checkpoints.RootDir, conf.Archive.RootDir, t.Unix(), conf.Archive.DeleteInstead) - if err != nil { - log.Printf("archiving failed: %s\n", err.Error()) - } else { - log.Printf("done: %d files zipped and moved to archive\n", n) - } - } - } - }() -} - func main() { var configFile string var enableGopsAgent bool @@ -142,33 +27,24 @@ func main() { flag.Parse() startupTime := time.Now() - conf = config.LoadConfiguration(configFile) - memorystore.Init(conf.Metrics) - ms = memorystore.GetMemoryStore() + config.Init(configFile) + memorystore.Init(config.Keys.Metrics) + ms := memorystore.GetMemoryStore() - if enableGopsAgent || conf.Debug.EnableGops { + if enableGopsAgent || config.Keys.Debug.EnableGops { if err := agent.Listen(agent.Options{}); err != nil { log.Fatal(err) } } - if conf.Debug.DumpToFile != "" { - f, err := os.Create(conf.Debug.DumpToFile) - if err != nil { - log.Fatal(err) - } - - debugDump = f - } - - d, err := time.ParseDuration(conf.Checkpoints.Restore) + d, err := time.ParseDuration(config.Keys.Checkpoints.Restore) if err != nil { log.Fatal(err) } restoreFrom := startupTime.Add(-d) log.Printf("Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339)) - files, err := ms.FromCheckpoint(conf.Checkpoints.RootDir, restoreFrom.Unix()) + files, err := ms.FromCheckpoint(config.Keys.Checkpoints.RootDir, restoreFrom.Unix()) loadedData := ms.SizeInBytes() / 1024 / 1024 // In MB if err != nil { log.Fatalf("Loading checkpoints failed: %s\n", err.Error()) @@ -205,20 +81,24 @@ func main() { } }() - intervals(&wg, ctx) + wg.Add(3) + + memorystore.Retention(&wg, ctx) + memorystore.Checkpointing(&wg, ctx) + memorystore.Archiving(&wg, ctx) wg.Add(1) go func() { - err := api.StartApiServer(ctx, conf.HttpConfig) + err := api.StartApiServer(ctx, config.Keys.HttpConfig) if err != nil { log.Fatal(err) } wg.Done() }() - if conf.Nats != nil { - for _, natsConf := range conf.Nats { + if config.Keys.Nats != nil { + for _, natsConf := range config.Keys.Nats { // TODO: When multiple nats configs share a URL, do a single connect. wg.Add(1) nc := natsConf @@ -234,17 +114,5 @@ func main() { } wg.Wait() - - log.Printf("Writing to '%s'...\n", conf.Checkpoints.RootDir) - files, err = ms.ToCheckpoint(conf.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix()) - if err != nil { - log.Printf("Writing checkpoint failed: %s\n", err.Error()) - } - log.Printf("Done! (%d files written)\n", files) - - if closer, ok := debugDump.(io.Closer); ok { - if err := closer.Close(); err != nil { - log.Printf("error: %s", err.Error()) - } - } + memorystore.Shutdown() } diff --git a/internal/api/api.go b/internal/api/api.go index 397390d..73b513d 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -27,9 +27,9 @@ import ( type ApiMetricData struct { Error *string `json:"error,omitempty"` + Data util.FloatArray `json:"data,omitempty"` From int64 `json:"from"` To int64 `json:"to"` - Data util.FloatArray `json:"data,omitempty"` Avg util.Float `json:"avg"` Min util.Float `json:"min"` Max util.Float `json:"max"` @@ -73,8 +73,8 @@ func (data *ApiMetricData) ScaleBy(f util.Float) { } } -func (data *ApiMetricData) PadDataWithNull(from, to int64, metric string) { - minfo, ok := memoryStore.metrics[metric] +func (data *ApiMetricData) PadDataWithNull(ms *memorystore.MemoryStore, from, to int64, metric string) { + minfo, ok := ms.Metrics[metric] if !ok { return } @@ -105,12 +105,12 @@ func handleFree(rw http.ResponseWriter, r *http.Request) { return } - // TODO: lastCheckpoint might be modified by different go-routines. - // Load it using the sync/atomic package? - freeUpTo := lastCheckpoint.Unix() - if to < freeUpTo { - freeUpTo = to - } + // // TODO: lastCheckpoint might be modified by different go-routines. + // // Load it using the sync/atomic package? + // freeUpTo := lastCheckpoint.Unix() + // if to < freeUpTo { + // freeUpTo = to + // } if r.Method != http.MethodPost { http.Error(rw, "Method Not Allowed", http.StatusMethodNotAllowed) @@ -125,9 +125,10 @@ func handleFree(rw http.ResponseWriter, r *http.Request) { return } + ms := memorystore.GetMemoryStore() n := 0 for _, sel := range selectors { - bn, err := memoryStore.Free(sel, freeUpTo) + bn, err := ms.Free(sel, to) if err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) return @@ -137,7 +138,7 @@ func handleFree(rw http.ResponseWriter, r *http.Request) { } rw.WriteHeader(http.StatusOK) - rw.Write([]byte(fmt.Sprintf("buffers freed: %d\n", n))) + fmt.Fprintf(rw, "buffers freed: %d\n", n) } func handleWrite(rw http.ResponseWriter, r *http.Request) { @@ -153,26 +154,9 @@ func handleWrite(rw http.ResponseWriter, r *http.Request) { return } - if debugDump != io.Discard { - now := time.Now() - msg := make([]byte, 0, 512) - msg = append(msg, "\n--- local unix time: "...) - msg = strconv.AppendInt(msg, now.Unix(), 10) - msg = append(msg, " ---\n"...) - - debugDumpLock.Lock() - defer debugDumpLock.Unlock() - if _, err := debugDump.Write(msg); err != nil { - log.Printf("error while writing to debug dump: %s", err.Error()) - } - if _, err := debugDump.Write(bytes); err != nil { - log.Printf("error while writing to debug dump: %s", err.Error()) - } - return - } - + ms := memorystore.GetMemoryStore() dec := lineprotocol.NewDecoderWithBytes(bytes) - if err := decodeLine(dec, r.URL.Query().Get("cluster")); err != nil { + if err := decodeLine(dec, ms, r.URL.Query().Get("cluster")); err != nil { log.Printf("/api/write error: %s", err.Error()) http.Error(rw, err.Error(), http.StatusBadRequest) return @@ -182,13 +166,13 @@ func handleWrite(rw http.ResponseWriter, r *http.Request) { type ApiQueryRequest struct { Cluster string `json:"cluster"` + Queries []ApiQuery `json:"queries"` + ForAllNodes []string `json:"for-all-nodes"` From int64 `json:"from"` To int64 `json:"to"` WithStats bool `json:"with-stats"` WithData bool `json:"with-data"` WithPadding bool `json:"with-padding"` - Queries []ApiQuery `json:"queries"` - ForAllNodes []string `json:"for-all-nodes"` } type ApiQueryResponse struct { @@ -197,19 +181,19 @@ type ApiQueryResponse struct { } type ApiQuery struct { - Metric string `json:"metric"` - Hostname string `json:"host"` - Aggregate bool `json:"aggreg"` - ScaleFactor Float `json:"scale-by,omitempty"` - Type *string `json:"type,omitempty"` - TypeIds []string `json:"type-ids,omitempty"` - SubType *string `json:"subtype,omitempty"` - SubTypeIds []string `json:"subtype-ids,omitempty"` + Type *string `json:"type,omitempty"` + SubType *string `json:"subtype,omitempty"` + Metric string `json:"metric"` + Hostname string `json:"host"` + TypeIds []string `json:"type-ids,omitempty"` + SubTypeIds []string `json:"subtype-ids,omitempty"` + ScaleFactor util.Float `json:"scale-by,omitempty"` + Aggregate bool `json:"aggreg"` } func handleQuery(rw http.ResponseWriter, r *http.Request) { var err error - var req ApiQueryRequest = ApiQueryRequest{WithStats: true, WithData: true, WithPadding: true} + req := ApiQueryRequest{WithStats: true, WithData: true, WithPadding: true} if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(rw, err.Error(), http.StatusBadRequest) return @@ -235,29 +219,29 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) { } for _, query := range req.Queries { - sels := make([]Selector, 0, 1) + sels := make([]util.Selector, 0, 1) if query.Aggregate || query.Type == nil { - sel := Selector{{String: req.Cluster}, {String: query.Hostname}} + sel := util.Selector{{String: req.Cluster}, {String: query.Hostname}} if query.Type != nil { if len(query.TypeIds) == 1 { - sel = append(sel, SelectorElement{String: *query.Type + query.TypeIds[0]}) + sel = append(sel, util.SelectorElement{String: *query.Type + query.TypeIds[0]}) } else { ids := make([]string, len(query.TypeIds)) for i, id := range query.TypeIds { ids[i] = *query.Type + id } - sel = append(sel, SelectorElement{Group: ids}) + sel = append(sel, util.SelectorElement{Group: ids}) } if query.SubType != nil { if len(query.SubTypeIds) == 1 { - sel = append(sel, SelectorElement{String: *query.SubType + query.SubTypeIds[0]}) + sel = append(sel, util.SelectorElement{String: *query.SubType + query.SubTypeIds[0]}) } else { ids := make([]string, len(query.SubTypeIds)) for i, id := range query.SubTypeIds { ids[i] = *query.SubType + id } - sel = append(sel, SelectorElement{Group: ids}) + sel = append(sel, util.SelectorElement{Group: ids}) } } } @@ -266,7 +250,7 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) { for _, typeId := range query.TypeIds { if query.SubType != nil { for _, subTypeId := range query.SubTypeIds { - sels = append(sels, Selector{ + sels = append(sels, util.Selector{ {String: req.Cluster}, {String: query.Hostname}, {String: *query.Type + typeId}, @@ -274,7 +258,7 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) { }) } } else { - sels = append(sels, Selector{ + sels = append(sels, util.Selector{ {String: req.Cluster}, {String: query.Hostname}, {String: *query.Type + typeId}, @@ -289,7 +273,7 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) { res := make([]ApiMetricData, 0, len(sels)) for _, sel := range sels { data := ApiMetricData{} - data.Data, data.From, data.To, err = memoryStore.Read(sel, query.Metric, req.From, req.To) + data.Data, data.From, data.To, err = ms.Read(sel, query.Metric, req.From, req.To) // log.Printf("data: %#v, %#v, %#v, %#v", data.Data, data.From, data.To, err) if err != nil { msg := err.Error() @@ -305,7 +289,7 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) { data.ScaleBy(query.ScaleFactor) } if req.WithPadding { - data.PadDataWithNull(req.From, req.To, query.Metric) + data.PadDataWithNull(ms, req.From, req.To, query.Metric) } if !req.WithData { data.Data = nil @@ -324,6 +308,20 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) { } } +func handleDebug(rw http.ResponseWriter, r *http.Request) { + raw := r.URL.Query().Get("selector") + selector := []string{} + if len(raw) != 0 { + selector = strings.Split(raw, ":") + } + + ms := memorystore.GetMemoryStore() + if err := ms.DebugDump(bufio.NewWriter(rw), selector); err != nil { + rw.WriteHeader(http.StatusBadRequest) + rw.Write([]byte(err.Error())) + } +} + func authentication(next http.Handler, publicKey ed25519.PublicKey) http.Handler { cacheLock := sync.RWMutex{} cache := map[string]*jwt.Token{} @@ -375,18 +373,7 @@ func StartApiServer(ctx context.Context, httpConfig *config.HttpConfig) error { r.HandleFunc("/api/free", handleFree) r.HandleFunc("/api/write", handleWrite) r.HandleFunc("/api/query", handleQuery) - r.HandleFunc("/api/debug", func(rw http.ResponseWriter, r *http.Request) { - raw := r.URL.Query().Get("selector") - selector := []string{} - if len(raw) != 0 { - selector = strings.Split(raw, ":") - } - - if err := memoryStore.DebugDump(bufio.NewWriter(rw), selector); err != nil { - rw.WriteHeader(http.StatusBadRequest) - rw.Write([]byte(err.Error())) - } - }) + r.HandleFunc("/api/debug", handleDebug) server := &http.Server{ Handler: r, @@ -395,8 +382,8 @@ func StartApiServer(ctx context.Context, httpConfig *config.HttpConfig) error { ReadTimeout: 30 * time.Second, } - if len(conf.JwtPublicKey) > 0 { - buf, err := base64.StdEncoding.DecodeString(conf.JwtPublicKey) + if len(config.Keys.JwtPublicKey) > 0 { + buf, err := base64.StdEncoding.DecodeString(config.Keys.JwtPublicKey) if err != nil { return err } diff --git a/internal/api/lineprotocol.go b/internal/api/lineprotocol.go index f48f7c3..9081638 100644 --- a/internal/api/lineprotocol.go +++ b/internal/api/lineprotocol.go @@ -191,7 +191,7 @@ func decodeLine(dec *lineprotocol.Decoder, // cluster and host. By using `WriteToLevel` (level = host), we do not need // to take the root- and cluster-level lock as often. var lvl *memorystore.Level = nil - var prevCluster, prevHost string = "", "" + prevCluster, prevHost := "", "" var ok bool for dec.Next() { diff --git a/internal/config/config.go b/internal/config/config.go index 0719d1f..b829b5d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -96,8 +96,9 @@ type Config struct { Nats []*NatsConfig `json:"nats"` } -func LoadConfiguration(file string) Config { - var config Config +var Keys Config + +func Init(file string) { configFile, err := os.Open(file) if err != nil { log.Fatal(err) @@ -105,8 +106,7 @@ func LoadConfiguration(file string) Config { defer configFile.Close() dec := json.NewDecoder(configFile) dec.DisallowUnknownFields() - if err := dec.Decode(&config); err != nil { + if err := dec.Decode(&Keys); err != nil { log.Fatal(err) } - return config } diff --git a/internal/memorystore/archive.go b/internal/memorystore/archive.go index a6fe5dc..fab457c 100644 --- a/internal/memorystore/archive.go +++ b/internal/memorystore/archive.go @@ -3,470 +3,57 @@ package memorystore import ( "archive/zip" "bufio" - "encoding/json" + "context" "errors" "fmt" "io" - "io/fs" "log" "os" - "path" "path/filepath" - "runtime" - "sort" - "strconv" - "strings" "sync" "sync/atomic" + "time" + + "github.com/ClusterCockpit/cc-metric-store/internal/config" ) -// Whenever changed, update MarshalJSON as well! -type CheckpointMetrics struct { - Frequency int64 `json:"frequency"` - Start int64 `json:"start"` - Data []Float `json:"data"` -} - -// As `Float` implements a custom MarshalJSON() function, -// serializing an array of such types has more overhead -// than one would assume (because of extra allocations, interfaces and so on). -func (cm *CheckpointMetrics) MarshalJSON() ([]byte, error) { - buf := make([]byte, 0, 128+len(cm.Data)*8) - buf = append(buf, `{"frequency":`...) - buf = strconv.AppendInt(buf, cm.Frequency, 10) - buf = append(buf, `,"start":`...) - buf = strconv.AppendInt(buf, cm.Start, 10) - buf = append(buf, `,"data":[`...) - for i, x := range cm.Data { - if i != 0 { - buf = append(buf, ',') +func Archiving(wg *sync.WaitGroup, ctx context.Context) { + go func() { + defer wg.Done() + d, err := time.ParseDuration(config.Keys.Archive.Interval) + if err != nil { + log.Fatal(err) } - if x.IsNaN() { - buf = append(buf, `null`...) - } else { - buf = strconv.AppendFloat(buf, float64(x), 'f', 1, 32) + if d <= 0 { + return } - } - buf = append(buf, `]}`...) - return buf, nil -} -type CheckpointFile struct { - From int64 `json:"from"` - To int64 `json:"to"` - Metrics map[string]*CheckpointMetrics `json:"metrics"` - Children map[string]*CheckpointFile `json:"children"` + ticks := func() <-chan time.Time { + if d <= 0 { + return nil + } + return time.NewTicker(d).C + }() + for { + select { + case <-ctx.Done(): + return + case <-ticks: + t := time.Now().Add(-d) + log.Printf("start archiving checkpoints (older than %s)...\n", t.Format(time.RFC3339)) + n, err := ArchiveCheckpoints(config.Keys.Checkpoints.RootDir, config.Keys.Archive.RootDir, t.Unix(), config.Keys.Archive.DeleteInstead) + if err != nil { + log.Printf("archiving failed: %s\n", err.Error()) + } else { + log.Printf("done: %d files zipped and moved to archive\n", n) + } + } + } + }() } var ErrNoNewData error = errors.New("all data already archived") -var NumWorkers int = 4 - -func init() { - maxWorkers := 10 - NumWorkers = runtime.NumCPU()/2 + 1 - if NumWorkers > maxWorkers { - NumWorkers = maxWorkers - } -} - -// Metrics stored at the lowest 2 levels are not stored away (root and cluster)! -// On a per-host basis a new JSON file is created. I have no idea if this will scale. -// The good thing: Only a host at a time is locked, so this function can run -// in parallel to writes/reads. -func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { - levels := make([]*Level, 0) - selectors := make([][]string, 0) - m.root.lock.RLock() - for sel1, l1 := range m.root.children { - l1.lock.RLock() - for sel2, l2 := range l1.children { - levels = append(levels, l2) - selectors = append(selectors, []string{sel1, sel2}) - } - l1.lock.RUnlock() - } - m.root.lock.RUnlock() - - type workItem struct { - level *Level - dir string - selector []string - } - - n, errs := int32(0), int32(0) - - var wg sync.WaitGroup - wg.Add(NumWorkers) - work := make(chan workItem, NumWorkers*2) - for worker := 0; worker < NumWorkers; worker++ { - go func() { - defer wg.Done() - - for workItem := range work { - if err := workItem.level.toCheckpoint(workItem.dir, from, to, m); err != nil { - if err == ErrNoNewData { - continue - } - - log.Printf("error while checkpointing %#v: %s", workItem.selector, err.Error()) - atomic.AddInt32(&errs, 1) - } else { - atomic.AddInt32(&n, 1) - } - } - }() - } - - for i := 0; i < len(levels); i++ { - dir := path.Join(dir, path.Join(selectors[i]...)) - work <- workItem{ - level: levels[i], - dir: dir, - selector: selectors[i], - } - } - - close(work) - wg.Wait() - - if errs > 0 { - return int(n), fmt.Errorf("%d errors happend while creating checkpoints (%d successes)", errs, n) - } - return int(n), nil -} - -func (l *Level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFile, error) { - l.lock.RLock() - defer l.lock.RUnlock() - - retval := &CheckpointFile{ - From: from, - To: to, - Metrics: make(map[string]*CheckpointMetrics), - Children: make(map[string]*CheckpointFile), - } - - for metric, minfo := range m.Metrics { - b := l.metrics[minfo.offset] - if b == nil { - continue - } - - allArchived := true - b.iterFromTo(from, to, func(b *buffer) error { - if !b.archived { - allArchived = false - } - return nil - }) - - if allArchived { - continue - } - - data := make([]Float, (to-from)/b.frequency+1) - data, start, end, err := b.read(from, to, data) - if err != nil { - return nil, err - } - - for i := int((end - start) / b.frequency); i < len(data); i++ { - data[i] = NaN - } - - retval.Metrics[metric] = &CheckpointMetrics{ - Frequency: b.frequency, - Start: start, - Data: data, - } - } - - for name, child := range l.children { - val, err := child.toCheckpointFile(from, to, m) - if err != nil { - return nil, err - } - - if val != nil { - retval.Children[name] = val - } - } - - if len(retval.Children) == 0 && len(retval.Metrics) == 0 { - return nil, nil - } - - return retval, nil -} - -func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { - cf, err := l.toCheckpointFile(from, to, m) - if err != nil { - return err - } - - if cf == nil { - return ErrNoNewData - } - - filepath := path.Join(dir, fmt.Sprintf("%d.json", from)) - f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0o644) - if err != nil && os.IsNotExist(err) { - err = os.MkdirAll(dir, 0o755) - if err == nil { - f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0o644) - } - } - if err != nil { - return err - } - defer f.Close() - - bw := bufio.NewWriter(f) - if err = json.NewEncoder(bw).Encode(cf); err != nil { - return err - } - - return bw.Flush() -} - -// Metrics stored at the lowest 2 levels are not loaded (root and cluster)! -// This function can only be called once and before the very first write or read. -// Different host's data is loaded to memory in parallel. -func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { - var wg sync.WaitGroup - work := make(chan [2]string, NumWorkers) - n, errs := int32(0), int32(0) - - wg.Add(NumWorkers) - for worker := 0; worker < NumWorkers; worker++ { - go func() { - defer wg.Done() - for host := range work { - lvl := m.root.findLevelOrCreate(host[:], len(m.Metrics)) - nn, err := lvl.fromCheckpoint(filepath.Join(dir, host[0], host[1]), from, m) - if err != nil { - log.Fatalf("error while loading checkpoints: %s", err.Error()) - atomic.AddInt32(&errs, 1) - } - atomic.AddInt32(&n, int32(nn)) - } - }() - } - - i := 0 - clustersDir, err := os.ReadDir(dir) - for _, clusterDir := range clustersDir { - if !clusterDir.IsDir() { - err = errors.New("expected only directories at first level of checkpoints/ directory") - goto done - } - - hostsDir, e := os.ReadDir(filepath.Join(dir, clusterDir.Name())) - if e != nil { - err = e - goto done - } - - for _, hostDir := range hostsDir { - if !hostDir.IsDir() { - err = errors.New("expected only directories at second level of checkpoints/ directory") - goto done - } - - i++ - if i%NumWorkers == 0 && i > 100 { - // Forcing garbage collection runs here regulary during the loading of checkpoints - // will decrease the total heap size after loading everything back to memory is done. - // While loading data, the heap will grow fast, so the GC target size will double - // almost always. By forcing GCs here, we can keep it growing more slowly so that - // at the end, less memory is wasted. - runtime.GC() - } - - work <- [2]string{clusterDir.Name(), hostDir.Name()} - } - } -done: - close(work) - wg.Wait() - - if err != nil { - return int(n), err - } - - if errs > 0 { - return int(n), fmt.Errorf("%d errors happend while creating checkpoints (%d successes)", errs, n) - } - return int(n), nil -} - -func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error { - for name, metric := range cf.Metrics { - n := len(metric.Data) - b := &buffer{ - frequency: metric.Frequency, - start: metric.Start, - data: metric.Data[0:n:n], // Space is wasted here :( - prev: nil, - next: nil, - archived: true, - } - b.close() - - minfo, ok := m.Metrics[name] - if !ok { - continue - // return errors.New("Unkown metric: " + name) - } - - prev := l.metrics[minfo.offset] - if prev == nil { - l.metrics[minfo.offset] = b - } else { - if prev.start > b.start { - return errors.New("wooops") - } - - b.prev = prev - prev.next = b - } - l.metrics[minfo.offset] = b - } - - if len(cf.Children) > 0 && l.children == nil { - l.children = make(map[string]*Level) - } - - for sel, childCf := range cf.Children { - child, ok := l.children[sel] - if !ok { - child = &Level{ - metrics: make([]*buffer, len(m.Metrics)), - children: nil, - } - l.children[sel] = child - } - - if err := child.loadFile(childCf, m); err != nil { - return err - } - } - - return nil -} - -func (l *Level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, error) { - direntries, err := os.ReadDir(dir) - if err != nil { - if os.IsNotExist(err) { - return 0, nil - } - - return 0, err - } - - jsonFiles := make([]fs.DirEntry, 0) - filesLoaded := 0 - for _, e := range direntries { - if e.IsDir() { - child := &Level{ - metrics: make([]*buffer, len(m.Metrics)), - children: make(map[string]*Level), - } - - files, err := child.fromCheckpoint(path.Join(dir, e.Name()), from, m) - filesLoaded += files - if err != nil { - return filesLoaded, err - } - - l.children[e.Name()] = child - } else if strings.HasSuffix(e.Name(), ".json") { - jsonFiles = append(jsonFiles, e) - } else { - return filesLoaded, errors.New("unexpected file: " + dir + "/" + e.Name()) - } - } - - files, err := findFiles(jsonFiles, from, true) - if err != nil { - return filesLoaded, err - } - - for _, filename := range files { - f, err := os.Open(path.Join(dir, filename)) - if err != nil { - return filesLoaded, err - } - defer f.Close() - - br := bufio.NewReader(f) - cf := &CheckpointFile{} - if err = json.NewDecoder(br).Decode(cf); err != nil { - return filesLoaded, err - } - - if cf.To != 0 && cf.To < from { - continue - } - - if err = l.loadFile(cf, m); err != nil { - return filesLoaded, err - } - - filesLoaded += 1 - } - - return filesLoaded, nil -} - -// This will probably get very slow over time! -// A solution could be some sort of an index file in which all other files -// and the timespan they contain is listed. -func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]string, error) { - nums := map[string]int64{} - for _, e := range direntries { - ts, err := strconv.ParseInt(strings.TrimSuffix(e.Name(), ".json"), 10, 64) - if err != nil { - return nil, err - } - nums[e.Name()] = ts - } - - sort.Slice(direntries, func(i, j int) bool { - a, b := direntries[i], direntries[j] - return nums[a.Name()] < nums[b.Name()] - }) - - filenames := make([]string, 0) - for i := 0; i < len(direntries); i++ { - e := direntries[i] - ts1 := nums[e.Name()] - - if findMoreRecentFiles && t <= ts1 || i == len(direntries)-1 { - filenames = append(filenames, e.Name()) - continue - } - - enext := direntries[i+1] - ts2 := nums[enext.Name()] - - if findMoreRecentFiles { - if ts1 < t && t < ts2 { - filenames = append(filenames, e.Name()) - } - } else { - if ts2 < t { - filenames = append(filenames, e.Name()) - } - } - } - - return filenames, nil -} - // ZIP all checkpoint files older than `from` together and write them to the `archiveDir`, // deleting them from the `checkpointsDir`. func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64, deleteInstead bool) (int, error) { diff --git a/internal/memorystore/buffer.go b/internal/memorystore/buffer.go index 397be97..34fee5d 100644 --- a/internal/memorystore/buffer.go +++ b/internal/memorystore/buffer.go @@ -34,21 +34,13 @@ var ( // If `cap(data)` is reached, a new buffer is created and // becomes the new head of a buffer list. type buffer struct { - frequency int64 // Time between two "slots" - start int64 // Timestamp of when `data[0]` was written. - data []util.Float // The slice should never reallocacte as `cap(data)` is respected. - prev, next *buffer // `prev` contains older data, `next` newer data. - archived bool // If true, this buffer is already archived - - closed bool - /* - statisticts struct { - samples int - min Float - max Float - avg Float - } - */ + prev *buffer + next *buffer + data []util.Float + frequency int64 + start int64 + archived bool + closed bool } func newBuffer(ts, freq int64) *buffer { @@ -163,8 +155,8 @@ func (b *buffer) read(from, to int64, data []util.Float) ([]util.Float, int64, i from = b.firstWrite() } - var i int = 0 - var t int64 = from + i := 0 + t := from for ; t < to; t += b.frequency { idx := int((t - b.start) / b.frequency) if idx >= cap(b.data) { diff --git a/internal/memorystore/checkpoint.go b/internal/memorystore/checkpoint.go new file mode 100644 index 0000000..9b036d5 --- /dev/null +++ b/internal/memorystore/checkpoint.go @@ -0,0 +1,501 @@ +package memorystore + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io/fs" + "log" + "os" + "path" + "path/filepath" + "runtime" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/ClusterCockpit/cc-metric-store/internal/config" + "github.com/ClusterCockpit/cc-metric-store/internal/util" +) + +// Whenever changed, update MarshalJSON as well! +type CheckpointMetrics struct { + Data []util.Float `json:"data"` + Frequency int64 `json:"frequency"` + Start int64 `json:"start"` +} + +type CheckpointFile struct { + Metrics map[string]*CheckpointMetrics `json:"metrics"` + Children map[string]*CheckpointFile `json:"children"` + From int64 `json:"from"` + To int64 `json:"to"` +} + +var lastCheckpoint time.Time + +func Checkpointing(wg *sync.WaitGroup, ctx context.Context) { + lastCheckpoint = time.Now() + ms := GetMemoryStore() + + go func() { + defer wg.Done() + d, err := time.ParseDuration(config.Keys.Checkpoints.Interval) + if err != nil { + log.Fatal(err) + } + if d <= 0 { + return + } + + ticks := func() <-chan time.Time { + if d <= 0 { + return nil + } + return time.NewTicker(d).C + }() + for { + select { + case <-ctx.Done(): + return + case <-ticks: + log.Printf("start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339)) + now := time.Now() + n, err := ms.ToCheckpoint(config.Keys.Checkpoints.RootDir, + lastCheckpoint.Unix(), now.Unix()) + if err != nil { + log.Printf("checkpointing failed: %s\n", err.Error()) + } else { + log.Printf("done: %d checkpoint files created\n", n) + lastCheckpoint = now + } + } + } + }() +} + +// As `Float` implements a custom MarshalJSON() function, +// serializing an array of such types has more overhead +// than one would assume (because of extra allocations, interfaces and so on). +func (cm *CheckpointMetrics) MarshalJSON() ([]byte, error) { + buf := make([]byte, 0, 128+len(cm.Data)*8) + buf = append(buf, `{"frequency":`...) + buf = strconv.AppendInt(buf, cm.Frequency, 10) + buf = append(buf, `,"start":`...) + buf = strconv.AppendInt(buf, cm.Start, 10) + buf = append(buf, `,"data":[`...) + for i, x := range cm.Data { + if i != 0 { + buf = append(buf, ',') + } + if x.IsNaN() { + buf = append(buf, `null`...) + } else { + buf = strconv.AppendFloat(buf, float64(x), 'f', 1, 32) + } + } + buf = append(buf, `]}`...) + return buf, nil +} + +// Metrics stored at the lowest 2 levels are not stored away (root and cluster)! +// On a per-host basis a new JSON file is created. I have no idea if this will scale. +// The good thing: Only a host at a time is locked, so this function can run +// in parallel to writes/reads. +func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { + levels := make([]*Level, 0) + selectors := make([][]string, 0) + m.root.lock.RLock() + for sel1, l1 := range m.root.children { + l1.lock.RLock() + for sel2, l2 := range l1.children { + levels = append(levels, l2) + selectors = append(selectors, []string{sel1, sel2}) + } + l1.lock.RUnlock() + } + m.root.lock.RUnlock() + + type workItem struct { + level *Level + dir string + selector []string + } + + n, errs := int32(0), int32(0) + + var wg sync.WaitGroup + wg.Add(NumWorkers) + work := make(chan workItem, NumWorkers*2) + for worker := 0; worker < NumWorkers; worker++ { + go func() { + defer wg.Done() + + for workItem := range work { + if err := workItem.level.toCheckpoint(workItem.dir, from, to, m); err != nil { + if err == ErrNoNewData { + continue + } + + log.Printf("error while checkpointing %#v: %s", workItem.selector, err.Error()) + atomic.AddInt32(&errs, 1) + } else { + atomic.AddInt32(&n, 1) + } + } + }() + } + + for i := 0; i < len(levels); i++ { + dir := path.Join(dir, path.Join(selectors[i]...)) + work <- workItem{ + level: levels[i], + dir: dir, + selector: selectors[i], + } + } + + close(work) + wg.Wait() + + if errs > 0 { + return int(n), fmt.Errorf("%d errors happend while creating checkpoints (%d successes)", errs, n) + } + return int(n), nil +} + +func (l *Level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFile, error) { + l.lock.RLock() + defer l.lock.RUnlock() + + retval := &CheckpointFile{ + From: from, + To: to, + Metrics: make(map[string]*CheckpointMetrics), + Children: make(map[string]*CheckpointFile), + } + + for metric, minfo := range m.Metrics { + b := l.metrics[minfo.Offset] + if b == nil { + continue + } + + allArchived := true + b.iterFromTo(from, to, func(b *buffer) error { + if !b.archived { + allArchived = false + } + return nil + }) + + if allArchived { + continue + } + + data := make([]util.Float, (to-from)/b.frequency+1) + data, start, end, err := b.read(from, to, data) + if err != nil { + return nil, err + } + + for i := int((end - start) / b.frequency); i < len(data); i++ { + data[i] = util.NaN + } + + retval.Metrics[metric] = &CheckpointMetrics{ + Frequency: b.frequency, + Start: start, + Data: data, + } + } + + for name, child := range l.children { + val, err := child.toCheckpointFile(from, to, m) + if err != nil { + return nil, err + } + + if val != nil { + retval.Children[name] = val + } + } + + if len(retval.Children) == 0 && len(retval.Metrics) == 0 { + return nil, nil + } + + return retval, nil +} + +func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { + cf, err := l.toCheckpointFile(from, to, m) + if err != nil { + return err + } + + if cf == nil { + return ErrNoNewData + } + + filepath := path.Join(dir, fmt.Sprintf("%d.json", from)) + f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil && os.IsNotExist(err) { + err = os.MkdirAll(dir, 0o755) + if err == nil { + f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0o644) + } + } + if err != nil { + return err + } + defer f.Close() + + bw := bufio.NewWriter(f) + if err = json.NewEncoder(bw).Encode(cf); err != nil { + return err + } + + return bw.Flush() +} + +// Metrics stored at the lowest 2 levels are not loaded (root and cluster)! +// This function can only be called once and before the very first write or read. +// Different host's data is loaded to memory in parallel. +func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { + var wg sync.WaitGroup + work := make(chan [2]string, NumWorkers) + n, errs := int32(0), int32(0) + + wg.Add(NumWorkers) + for worker := 0; worker < NumWorkers; worker++ { + go func() { + defer wg.Done() + for host := range work { + lvl := m.root.findLevelOrCreate(host[:], len(m.Metrics)) + nn, err := lvl.fromCheckpoint(filepath.Join(dir, host[0], host[1]), from, m) + if err != nil { + log.Fatalf("error while loading checkpoints: %s", err.Error()) + atomic.AddInt32(&errs, 1) + } + atomic.AddInt32(&n, int32(nn)) + } + }() + } + + i := 0 + clustersDir, err := os.ReadDir(dir) + for _, clusterDir := range clustersDir { + if !clusterDir.IsDir() { + err = errors.New("expected only directories at first level of checkpoints/ directory") + goto done + } + + hostsDir, e := os.ReadDir(filepath.Join(dir, clusterDir.Name())) + if e != nil { + err = e + goto done + } + + for _, hostDir := range hostsDir { + if !hostDir.IsDir() { + err = errors.New("expected only directories at second level of checkpoints/ directory") + goto done + } + + i++ + if i%NumWorkers == 0 && i > 100 { + // Forcing garbage collection runs here regulary during the loading of checkpoints + // will decrease the total heap size after loading everything back to memory is done. + // While loading data, the heap will grow fast, so the GC target size will double + // almost always. By forcing GCs here, we can keep it growing more slowly so that + // at the end, less memory is wasted. + runtime.GC() + } + + work <- [2]string{clusterDir.Name(), hostDir.Name()} + } + } +done: + close(work) + wg.Wait() + + if err != nil { + return int(n), err + } + + if errs > 0 { + return int(n), fmt.Errorf("%d errors happend while creating checkpoints (%d successes)", errs, n) + } + return int(n), nil +} + +func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error { + for name, metric := range cf.Metrics { + n := len(metric.Data) + b := &buffer{ + frequency: metric.Frequency, + start: metric.Start, + data: metric.Data[0:n:n], // Space is wasted here :( + prev: nil, + next: nil, + archived: true, + } + b.close() + + minfo, ok := m.Metrics[name] + if !ok { + continue + // return errors.New("Unkown metric: " + name) + } + + prev := l.metrics[minfo.Offset] + if prev == nil { + l.metrics[minfo.Offset] = b + } else { + if prev.start > b.start { + return errors.New("wooops") + } + + b.prev = prev + prev.next = b + } + l.metrics[minfo.Offset] = b + } + + if len(cf.Children) > 0 && l.children == nil { + l.children = make(map[string]*Level) + } + + for sel, childCf := range cf.Children { + child, ok := l.children[sel] + if !ok { + child = &Level{ + metrics: make([]*buffer, len(m.Metrics)), + children: nil, + } + l.children[sel] = child + } + + if err := child.loadFile(childCf, m); err != nil { + return err + } + } + + return nil +} + +func (l *Level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, error) { + direntries, err := os.ReadDir(dir) + if err != nil { + if os.IsNotExist(err) { + return 0, nil + } + + return 0, err + } + + jsonFiles := make([]fs.DirEntry, 0) + filesLoaded := 0 + for _, e := range direntries { + if e.IsDir() { + child := &Level{ + metrics: make([]*buffer, len(m.Metrics)), + children: make(map[string]*Level), + } + + files, err := child.fromCheckpoint(path.Join(dir, e.Name()), from, m) + filesLoaded += files + if err != nil { + return filesLoaded, err + } + + l.children[e.Name()] = child + } else if strings.HasSuffix(e.Name(), ".json") { + jsonFiles = append(jsonFiles, e) + } else { + return filesLoaded, errors.New("unexpected file: " + dir + "/" + e.Name()) + } + } + + files, err := findFiles(jsonFiles, from, true) + if err != nil { + return filesLoaded, err + } + + for _, filename := range files { + f, err := os.Open(path.Join(dir, filename)) + if err != nil { + return filesLoaded, err + } + defer f.Close() + + br := bufio.NewReader(f) + cf := &CheckpointFile{} + if err = json.NewDecoder(br).Decode(cf); err != nil { + return filesLoaded, err + } + + if cf.To != 0 && cf.To < from { + continue + } + + if err = l.loadFile(cf, m); err != nil { + return filesLoaded, err + } + + filesLoaded += 1 + } + + return filesLoaded, nil +} + +// This will probably get very slow over time! +// A solution could be some sort of an index file in which all other files +// and the timespan they contain is listed. +func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]string, error) { + nums := map[string]int64{} + for _, e := range direntries { + ts, err := strconv.ParseInt(strings.TrimSuffix(e.Name(), ".json"), 10, 64) + if err != nil { + return nil, err + } + nums[e.Name()] = ts + } + + sort.Slice(direntries, func(i, j int) bool { + a, b := direntries[i], direntries[j] + return nums[a.Name()] < nums[b.Name()] + }) + + filenames := make([]string, 0) + for i := 0; i < len(direntries); i++ { + e := direntries[i] + ts1 := nums[e.Name()] + + if findMoreRecentFiles && t <= ts1 || i == len(direntries)-1 { + filenames = append(filenames, e.Name()) + continue + } + + enext := direntries[i+1] + ts2 := nums[enext.Name()] + + if findMoreRecentFiles { + if ts1 < t && t < ts2 { + filenames = append(filenames, e.Name()) + } + } else { + if ts2 < t { + filenames = append(filenames, e.Name()) + } + } + } + + return filenames, nil +} diff --git a/internal/memorystore/debug.go b/internal/memorystore/debug.go index 59a978b..2743a45 100644 --- a/internal/memorystore/debug.go +++ b/internal/memorystore/debug.go @@ -41,7 +41,7 @@ func (l *Level) debugDump(m *MemoryStore, w *bufio.Writer, lvlname string, buf [ depth += 1 objitems := 0 for name, mc := range m.Metrics { - if b := l.metrics[mc.offset]; b != nil { + if b := l.metrics[mc.Offset]; b != nil { for i := 0; i < depth; i++ { buf = append(buf, '\t') } diff --git a/internal/memorystore/level.go b/internal/memorystore/level.go index 34a58a2..4bbfe7c 100644 --- a/internal/memorystore/level.go +++ b/internal/memorystore/level.go @@ -12,9 +12,9 @@ import ( // Can be both a leaf or a inner node. In this tree structue, inner nodes can // also hold data (in `metrics`). type Level struct { + children map[string]*Level + metrics []*buffer lock sync.RWMutex - metrics []*buffer // Every level can store metrics. - children map[string]*Level // Lower levels. } // Find the correct level for the given selector, creating it if @@ -126,7 +126,7 @@ func (l *Level) findLevel(selector []string) *Level { return lvl.findLevel(selector[1:]) } -func (l *Level) findBuffers(selector Selector, offset int, f func(b *buffer) error) error { +func (l *Level) findBuffers(selector util.Selector, offset int, f func(b *buffer) error) error { l.lock.RLock() defer l.lock.RUnlock() diff --git a/internal/memorystore/memorystore.go b/internal/memorystore/memorystore.go index 305ebdd..4868a85 100644 --- a/internal/memorystore/memorystore.go +++ b/internal/memorystore/memorystore.go @@ -1,9 +1,12 @@ package memorystore import ( + "context" "errors" "log" + "runtime" "sync" + "time" "github.com/ClusterCockpit/cc-metric-store/internal/config" "github.com/ClusterCockpit/cc-metric-store/internal/util" @@ -14,6 +17,16 @@ var ( msInstance *MemoryStore ) +var NumWorkers int = 4 + +func init() { + maxWorkers := 10 + NumWorkers = runtime.NumCPU()/2 + 1 + if NumWorkers > maxWorkers { + NumWorkers = maxWorkers + } +} + type Metric struct { Name string Value util.Float @@ -21,8 +34,8 @@ type Metric struct { } type MemoryStore struct { - root Level // root of the tree structure Metrics map[string]config.MetricConfig + root Level } // Create a new, initialized instance of a MemoryStore. @@ -61,6 +74,54 @@ func GetMemoryStore() *MemoryStore { return msInstance } +func Shutdown() { + ms := GetMemoryStore() + log.Printf("Writing to '%s'...\n", config.Keys.Checkpoints.RootDir) + files, err := ms.ToCheckpoint(config.Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix()) + if err != nil { + log.Printf("Writing checkpoint failed: %s\n", err.Error()) + } + log.Printf("Done! (%d files written)\n", files) +} + +func Retention(wg *sync.WaitGroup, ctx context.Context) { + ms := GetMemoryStore() + + go func() { + defer wg.Done() + d, err := time.ParseDuration(config.Keys.RetentionInMemory) + if err != nil { + log.Fatal(err) + } + if d <= 0 { + return + } + + ticks := func() <-chan time.Time { + d := d / 2 + if d <= 0 { + return nil + } + return time.NewTicker(d).C + }() + for { + select { + case <-ctx.Done(): + return + case <-ticks: + t := time.Now().Add(-d) + log.Printf("start freeing buffers (older than %s)...\n", t.Format(time.RFC3339)) + freed, err := ms.Free(nil, t.Unix()) + if err != nil { + log.Printf("freeing up buffers failed: %s\n", err.Error()) + } else { + log.Printf("done: %d buffers freed\n", freed) + } + } + } + }() +} + // Write all values in `metrics` to the level specified by `selector` for time `ts`. // Look at `findLevelOrCreate` for how selectors work. func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error { @@ -117,7 +178,7 @@ func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metric // If the level does not hold the metric itself, the data will be aggregated recursively from the children. // The second and third return value are the actual from/to for the data. Those can be different from // the range asked for if no data was available. -func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([]util.Float, int64, int64, error) { +func (m *MemoryStore) Read(selector util.Selector, metric string, from, to int64) ([]util.Float, int64, int64, error) { if from > to { return nil, 0, 0, errors.New("invalid time range") } diff --git a/internal/memorystore/stats.go b/internal/memorystore/stats.go index 3240d02..5ddecfc 100644 --- a/internal/memorystore/stats.go +++ b/internal/memorystore/stats.go @@ -66,7 +66,7 @@ func (b *buffer) stats(from, to int64) (Stats, int64, int64, error) { // Returns statistics for the requested metric on the selected node/level. // Data is aggregated to the selected level the same way as in `MemoryStore.Read`. // If `Stats.Samples` is zero, the statistics should not be considered as valid. -func (m *MemoryStore) Stats(selector Selector, metric string, from, to int64) (*Stats, int64, int64, error) { +func (m *MemoryStore) Stats(selector util.Selector, metric string, from, to int64) (*Stats, int64, int64, error) { if from > to { return nil, 0, 0, errors.New("invalid time range") } diff --git a/internal/memorystore/selector.go b/internal/util/selector.go similarity index 97% rename from internal/memorystore/selector.go rename to internal/util/selector.go index 0b24300..27557ef 100644 --- a/internal/memorystore/selector.go +++ b/internal/util/selector.go @@ -1,4 +1,4 @@ -package memorystore +package util import ( "encoding/json" @@ -6,9 +6,9 @@ import ( ) type SelectorElement struct { - Any bool String string Group []string + Any bool } func (se *SelectorElement) UnmarshalJSON(input []byte) error {