From d54bedc82f591d69be8fffe8419783c8b903388a Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Tue, 26 Jul 2022 10:04:58 +0200 Subject: [PATCH] Fix names and add missing methods --- cmd/cc-metric-store/cc-metric-store.go | 99 +++++--------- internal/api/lineprotocol.go | 2 +- internal/api/lineprotocol_test.go | 2 +- internal/memstore/chunks.go | 4 +- internal/memstore/debug.go | 20 +-- internal/memstore/memstore.go | 108 +++++++++++++++ internal/memstore/memstore_test.go | 173 ++++++++++++------------- 7 files changed, 239 insertions(+), 169 deletions(-) diff --git a/cmd/cc-metric-store/cc-metric-store.go b/cmd/cc-metric-store/cc-metric-store.go index 8e69c95..bb2606f 100644 --- a/cmd/cc-metric-store/cc-metric-store.go +++ b/cmd/cc-metric-store/cc-metric-store.go @@ -6,7 +6,6 @@ import ( "encoding/json" "flag" "fmt" - "io" "log" "os" "os/signal" @@ -16,7 +15,12 @@ import ( "syscall" "time" + "github.com/ClusterCockpit/cc-metric-store/internal/api" + "github.com/ClusterCockpit/cc-metric-store/internal/api/apiv1" + "github.com/ClusterCockpit/cc-metric-store/internal/memstore" + "github.com/ClusterCockpit/cc-metric-store/internal/types" "github.com/google/gops/agent" + "github.com/influxdata/line-protocol/v2/lineprotocol" ) // For aggregation over multiple values at different cpus/sockets/..., not time! @@ -47,17 +51,6 @@ func (as *AggregationStrategy) UnmarshalJSON(data []byte) error { return nil } -type MetricConfig struct { - // Interval in seconds at which measurements will arive. - Frequency int64 `json:"frequency"` - - // Can be 'sum', 'avg' or null. Describes how to aggregate metrics from the same timestep over the hierarchy. - Aggregation AggregationStrategy `json:"aggregation"` - - // Private, used internally... - offset int -} - type HttpConfig struct { // Address to bind to, for example "0.0.0.0:8081" Address string `json:"address"` @@ -69,29 +62,12 @@ type HttpConfig struct { KeyFile string `json:"https-key-file"` } -type NatsConfig struct { - // Address of the nats server - Address string `json:"address"` - - // Username/Password, optional - Username string `json:"username"` - Password string `json:"password"` - - Subscriptions []struct { - // Channel name - SubscribeTo string `json:"subscribe-to"` - - // Allow lines without a cluster tag, use this as default, optional - ClusterTag string `json:"cluster-tag"` - } `json:"subscriptions"` -} - type Config struct { - Metrics map[string]MetricConfig `json:"metrics"` - RetentionInMemory string `json:"retention-in-memory"` - Nats []*NatsConfig `json:"nats"` - JwtPublicKey string `json:"jwt-public-key"` - HttpConfig *HttpConfig `json:"http-api"` + Metrics map[string]types.MetricConfig `json:"metrics"` + RetentionInMemory string `json:"retention-in-memory"` + Nats []*api.NatsConfig `json:"nats"` + JwtPublicKey string `json:"jwt-public-key"` + HttpConfig *HttpConfig `json:"http-api"` Checkpoints struct { Interval string `json:"interval"` RootDir string `json:"directory"` @@ -103,18 +79,14 @@ type Config struct { DeleteInstead bool `json:"delete-instead"` } `json:"archive"` Debug struct { - EnableGops bool `json:"gops"` - DumpToFile string `json:"dump-to-file"` + EnableGops bool `json:"gops"` } `json:"debug"` } var conf Config -var memoryStore *MemoryStore = nil +var memoryStore *memstore.MemoryStore = nil var lastCheckpoint time.Time -var debugDumpLock sync.Mutex -var debugDump io.Writer = io.Discard - func loadConfiguration(file string) Config { var config Config configFile, err := os.Open(file) @@ -163,12 +135,8 @@ func intervals(wg *sync.WaitGroup, ctx context.Context) { case <-ticks: t := time.Now().Add(-d) log.Printf("start freeing buffers (older than %s)...\n", t.Format(time.RFC3339)) - freed, err := memoryStore.Free(nil, t.Unix()) - if err != nil { - log.Printf("freeing up buffers failed: %s\n", err.Error()) - } else { - log.Printf("done: %d buffers freed\n", freed) - } + freed := memoryStore.Free(t.Unix()) + log.Printf("done: %d buffers freed\n", freed) } } }() @@ -192,7 +160,7 @@ func intervals(wg *sync.WaitGroup, ctx context.Context) { case <-ticks: log.Printf("start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339)) now := time.Now() - n, err := memoryStore.ToCheckpoint(conf.Checkpoints.RootDir, + n, err := memoryStore.ToJSONCheckpoint(conf.Checkpoints.RootDir, lastCheckpoint.Unix(), now.Unix()) if err != nil { log.Printf("checkpointing failed: %s\n", err.Error()) @@ -222,7 +190,7 @@ func intervals(wg *sync.WaitGroup, ctx context.Context) { case <-ticks: t := time.Now().Add(-d) log.Printf("start archiving checkpoints (older than %s)...\n", t.Format(time.RFC3339)) - n, err := ArchiveCheckpoints(conf.Checkpoints.RootDir, conf.Archive.RootDir, t.Unix(), conf.Archive.DeleteInstead) + n, err := memstore.ArchiveCheckpoints(conf.Checkpoints.RootDir, conf.Archive.RootDir, t.Unix(), conf.Archive.DeleteInstead) if err != nil { log.Printf("archiving failed: %s\n", err.Error()) } else { @@ -242,7 +210,7 @@ func main() { startupTime := time.Now() conf = loadConfiguration(configFile) - memoryStore = NewMemoryStore(conf.Metrics) + memoryStore = memstore.NewMemoryStore(conf.Metrics) if enableGopsAgent || conf.Debug.EnableGops { if err := agent.Listen(agent.Options{}); err != nil { @@ -250,15 +218,6 @@ func main() { } } - if conf.Debug.DumpToFile != "" { - f, err := os.Create(conf.Debug.DumpToFile) - if err != nil { - log.Fatal(err) - } - - debugDump = f - } - d, err := time.ParseDuration(conf.Checkpoints.Restore) if err != nil { log.Fatal(err) @@ -266,7 +225,7 @@ func main() { restoreFrom := startupTime.Add(-d) log.Printf("Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339)) - files, err := memoryStore.FromCheckpoint(conf.Checkpoints.RootDir, restoreFrom.Unix()) + files, err := memoryStore.FromJSONCheckpoint(conf.Checkpoints.RootDir, restoreFrom.Unix()) loadedData := memoryStore.SizeInBytes() / 1024 / 1024 // In MB if err != nil { log.Fatalf("Loading checkpoints failed: %s\n", err.Error()) @@ -307,8 +266,16 @@ func main() { wg.Add(1) + httpApi := apiv1.HttpApi{ + MemoryStore: memoryStore, + PublicKey: conf.JwtPublicKey, + Address: conf.HttpConfig.Address, + CertFile: conf.HttpConfig.CertFile, + KeyFile: conf.HttpConfig.KeyFile, + } + go func() { - err := StartApiServer(ctx, conf.HttpConfig) + err := httpApi.StartServer(ctx) if err != nil { log.Fatal(err) } @@ -322,7 +289,9 @@ func main() { nc := natsConf go func() { // err := ReceiveNats(conf.Nats, decodeLine, runtime.NumCPU()-1, ctx) - err := ReceiveNats(nc, decodeLine, 1, ctx) + err := api.ReceiveNats(nc, func(d *lineprotocol.Decoder, s string) error { + return api.DecodeLine(memoryStore, d, s) + }, 1, ctx) if err != nil { log.Fatal(err) @@ -335,15 +304,9 @@ func main() { wg.Wait() log.Printf("Writing to '%s'...\n", conf.Checkpoints.RootDir) - files, err = memoryStore.ToCheckpoint(conf.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix()) + files, err = memoryStore.ToJSONCheckpoint(conf.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix()) if err != nil { log.Printf("Writing checkpoint failed: %s\n", err.Error()) } log.Printf("Done! (%d files written)\n", files) - - if closer, ok := debugDump.(io.Closer); ok { - if err := closer.Close(); err != nil { - log.Printf("error: %s", err.Error()) - } - } } diff --git a/internal/api/lineprotocol.go b/internal/api/lineprotocol.go index 780b998..d484e47 100644 --- a/internal/api/lineprotocol.go +++ b/internal/api/lineprotocol.go @@ -187,7 +187,7 @@ func reorder(buf, prefix []byte) []byte { // Decode lines using dec and make write calls to the MemoryStore. // If a line is missing its cluster tag, use clusterDefault as default. -func decodeLine(memoryStore *memstore.MemoryStore, dec *lineprotocol.Decoder, clusterDefault string) error { +func DecodeLine(memoryStore *memstore.MemoryStore, dec *lineprotocol.Decoder, clusterDefault string) error { // Reduce allocations in loop: t := time.Now() metric, metricBuf := types.Metric{}, make([]byte, 0, 16) diff --git a/internal/api/lineprotocol_test.go b/internal/api/lineprotocol_test.go index dce1310..0b68505 100644 --- a/internal/api/lineprotocol_test.go +++ b/internal/api/lineprotocol_test.go @@ -139,7 +139,7 @@ func BenchmarkLineprotocolDecoder(b *testing.B) { dec := lineprotocol.NewDecoderWithBytes(data) b.StartTimer() - if err := decodeLine(memoryStore, dec, "ctest"); err != nil { + if err := DecodeLine(memoryStore, dec, "ctest"); err != nil { b.Fatal(err) } b.StopTimer() diff --git a/internal/memstore/chunks.go b/internal/memstore/chunks.go index b84adaf..b0bb0a7 100644 --- a/internal/memstore/chunks.go +++ b/internal/memstore/chunks.go @@ -124,7 +124,7 @@ func (c *chunk) read(from, to int64, data []types.Float) ([]types.Float, int64, return data[:i], from, t, nil } -func (c *chunk) stats(from, to int64, data []types.Float) (types.Stats, int64, int64, error) { +func (c *chunk) stats(from, to int64) (types.Stats, int64, int64, error) { stats := types.Stats{ Samples: 0, Min: types.Float(math.MaxFloat64), @@ -134,7 +134,7 @@ func (c *chunk) stats(from, to int64, data []types.Float) (types.Stats, int64, i if from < c.firstWrite() { if c.prev != nil { - return c.prev.stats(from, to, data) + return c.prev.stats(from, to) } from = c.firstWrite() } diff --git a/internal/memstore/debug.go b/internal/memstore/debug.go index 88af59f..3f8e954 100644 --- a/internal/memstore/debug.go +++ b/internal/memstore/debug.go @@ -1,4 +1,4 @@ -package main +package memstore import ( "bufio" @@ -6,22 +6,22 @@ import ( "strconv" ) -func (b *buffer) debugDump(buf []byte) []byte { - if b.prev != nil { - buf = b.prev.debugDump(buf) +func (c *chunk) debugDump(buf []byte) []byte { + if c.prev != nil { + buf = c.prev.debugDump(buf) } - start, len, end := b.start, len(b.data), b.start+b.frequency*int64(len(b.data)) + start, len, end := c.start, len(c.data), c.start+c.frequency*int64(len(c.data)) buf = append(buf, `{"start":`...) buf = strconv.AppendInt(buf, start, 10) buf = append(buf, `,"len":`...) buf = strconv.AppendInt(buf, int64(len), 10) buf = append(buf, `,"end":`...) buf = strconv.AppendInt(buf, end, 10) - if b.archived { + if c.checkpointed { buf = append(buf, `,"saved":true`...) } - if b.next != nil { + if c.next != nil { buf = append(buf, `},`...) } else { buf = append(buf, `}`...) @@ -29,7 +29,7 @@ func (b *buffer) debugDump(buf []byte) []byte { return buf } -func (l *level) debugDump(m *MemoryStore, w *bufio.Writer, lvlname string, buf []byte, depth int) ([]byte, error) { +func (l *Level) debugDump(m *MemoryStore, w *bufio.Writer, lvlname string, buf []byte, depth int) ([]byte, error) { l.lock.RLock() defer l.lock.RUnlock() for i := 0; i < depth; i++ { @@ -41,7 +41,7 @@ func (l *level) debugDump(m *MemoryStore, w *bufio.Writer, lvlname string, buf [ depth += 1 objitems := 0 for name, mc := range m.metrics { - if b := l.metrics[mc.offset]; b != nil { + if b := l.metrics[mc.Offset]; b != nil { for i := 0; i < depth; i++ { buf = append(buf, '\t') } @@ -55,7 +55,7 @@ func (l *level) debugDump(m *MemoryStore, w *bufio.Writer, lvlname string, buf [ } } - for name, lvl := range l.children { + for name, lvl := range l.sublevels { _, err := w.Write(buf) if err != nil { return nil, err diff --git a/internal/memstore/memstore.go b/internal/memstore/memstore.go index 8056942..c7e4b5f 100644 --- a/internal/memstore/memstore.go +++ b/internal/memstore/memstore.go @@ -4,6 +4,7 @@ import ( "errors" "math" "sync" + "unsafe" "github.com/ClusterCockpit/cc-metric-store/internal/types" ) @@ -139,6 +140,26 @@ func (l *Level) countValues(from int64, to int64) int { return vals } +func (l *Level) sizeInBytes() int64 { + l.lock.RLock() + defer l.lock.RUnlock() + size := int64(0) + + for _, b := range l.metrics { + for b != nil { + size += int64(len(b.data)) + b = b.prev + } + } + size *= int64(unsafe.Sizeof(types.Float(0))) + + for _, child := range l.sublevels { + size += child.sizeInBytes() + } + + return size +} + type MemoryStore struct { root Level // root of the tree structure // TODO... @@ -388,3 +409,90 @@ func (m *MemoryStore) Read(selector types.Selector, metric string, from, to int6 return data, from, to, nil } + +// Returns statistics for the requested metric on the selected node/level. +// Data is aggregated to the selected level the same way as in `MemoryStore.Read`. +// If `Stats.Samples` is zero, the statistics should not be considered as valid. +func (m *MemoryStore) Stats(selector types.Selector, metric string, from, to int64) (*types.Stats, int64, int64, error) { + if from > to { + return nil, 0, 0, errors.New("invalid time range") + } + + mc, ok := m.metrics[metric] + if !ok { + return nil, 0, 0, errors.New("unkown metric: " + metric) + } + + n, samples := 0, 0 + avg, min, max := types.Float(0), math.MaxFloat32, -math.MaxFloat32 + err := m.root.findBuffers(selector, mc.Offset, func(c *chunk) error { + stats, cfrom, cto, err := c.stats(from, to) + if err != nil { + return err + } + + if n == 0 { + from, to = cfrom, cto + } else if from != cfrom || to != cto { + return ErrDataDoesNotAlign + } + + samples += stats.Samples + avg += stats.Avg + min = math.Min(min, float64(stats.Min)) + max = math.Max(max, float64(stats.Max)) + n += 1 + return nil + }) + + if err != nil { + return nil, 0, 0, err + } + + if n == 0 { + return nil, 0, 0, ErrNoData + } + + if mc.Aggregation == types.AvgAggregation { + avg /= types.Float(n) + } else if n > 1 && mc.Aggregation != types.SumAggregation { + return nil, 0, 0, errors.New("invalid aggregation") + } + + return &types.Stats{ + Samples: samples, + Avg: avg, + Min: types.Float(min), + Max: types.Float(max), + }, from, to, nil +} + +// Given a selector, return a list of all children of the level selected. +func (m *MemoryStore) ListChildren(selector []string) []string { + lvl := &m.root + for lvl != nil && len(selector) != 0 { + lvl.lock.RLock() + next := lvl.sublevels[selector[0]] + lvl.lock.RUnlock() + lvl = next + selector = selector[1:] + } + + if lvl == nil { + return nil + } + + lvl.lock.RLock() + defer lvl.lock.RUnlock() + + children := make([]string, 0, len(lvl.sublevels)) + for child := range lvl.sublevels { + children = append(children, child) + } + + return children +} + +func (m *MemoryStore) SizeInBytes() int64 { + return m.root.sizeInBytes() +} diff --git a/internal/memstore/memstore_test.go b/internal/memstore/memstore_test.go index 8821e7a..240ca1e 100644 --- a/internal/memstore/memstore_test.go +++ b/internal/memstore/memstore_test.go @@ -1,4 +1,4 @@ -package main +package memstore import ( "fmt" @@ -6,20 +6,22 @@ import ( "math/rand" "sync" "testing" + + "github.com/ClusterCockpit/cc-metric-store/internal/types" ) func TestMemoryStoreBasics(t *testing.T) { frequency := int64(10) start, count := int64(100), int64(5000) - store := NewMemoryStore(map[string]MetricConfig{ + store := NewMemoryStore(map[string]types.MetricConfig{ "a": {Frequency: frequency}, "b": {Frequency: frequency * 2}, }) for i := int64(0); i < count; i++ { - err := store.Write([]string{"testhost"}, start+i*frequency, []Metric{ - {Name: "a", Value: Float(i)}, - {Name: "b", Value: Float(i / 2)}, + err := store.Write([]string{"testhost"}, start+i*frequency, []types.Metric{ + {Name: "a", Value: types.Float(i)}, + {Name: "b", Value: types.Float(i / 2)}, }) if err != nil { t.Error(err) @@ -27,7 +29,7 @@ func TestMemoryStoreBasics(t *testing.T) { } } - sel := Selector{{String: "testhost"}} + sel := types.Selector{{String: "testhost"}} adata, from, to, err := store.Read(sel, "a", start, start+count*frequency) if err != nil || from != start || to != start+count*frequency { t.Error(err) @@ -45,14 +47,14 @@ func TestMemoryStoreBasics(t *testing.T) { } for i := 0; i < int(count); i++ { - if adata[i] != Float(i) { - t.Errorf("incorrect value for metric a (%f vs. %f)", adata[i], Float(i)) + if adata[i] != types.Float(i) { + t.Errorf("incorrect value for metric a (%f vs. %f)", adata[i], types.Float(i)) return } } for i := 0; i < int(count/2); i++ { - if bdata[i] != Float(i) && bdata[i] != Float(i-1) { + if bdata[i] != types.Float(i) && bdata[i] != types.Float(i-1) { t.Errorf("incorrect value for metric b (%f) at index %d", bdata[i], i) return } @@ -62,8 +64,8 @@ func TestMemoryStoreBasics(t *testing.T) { func TestMemoryStoreTooMuchWrites(t *testing.T) { frequency := int64(10) - count := BUFFER_CAP*3 + 10 - store := NewMemoryStore(map[string]MetricConfig{ + count := bufferSizeInFloats*3 + 10 + store := NewMemoryStore(map[string]types.MetricConfig{ "a": {Frequency: frequency}, "b": {Frequency: frequency * 2}, "c": {Frequency: frequency / 2}, @@ -72,33 +74,33 @@ func TestMemoryStoreTooMuchWrites(t *testing.T) { start := int64(100) for i := 0; i < count; i++ { - if err := store.Write([]string{"test"}, start+int64(i)*frequency, []Metric{ - {Name: "a", Value: Float(i)}, - {Name: "b", Value: Float(i / 2)}, - {Name: "c", Value: Float(i * 2)}, - {Name: "d", Value: Float(i / 3)}, + if err := store.Write([]string{"test"}, start+int64(i)*frequency, []types.Metric{ + {Name: "a", Value: types.Float(i)}, + {Name: "b", Value: types.Float(i / 2)}, + {Name: "c", Value: types.Float(i * 2)}, + {Name: "d", Value: types.Float(i / 3)}, }); err != nil { t.Fatal(err) } } end := start + int64(count)*frequency - data, from, to, err := store.Read(Selector{{String: "test"}}, "a", start, end) + data, from, to, err := store.Read(types.Selector{{String: "test"}}, "a", start, end) if len(data) != count || from != start || to != end || err != nil { t.Fatalf("a: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data) } - data, from, to, err = store.Read(Selector{{String: "test"}}, "b", start, end) + data, from, to, err = store.Read(types.Selector{{String: "test"}}, "b", start, end) if len(data) != count/2 || from != start || to != end || err != nil { t.Fatalf("b: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data) } - data, from, to, err = store.Read(Selector{{String: "test"}}, "c", start, end) + data, from, to, err = store.Read(types.Selector{{String: "test"}}, "c", start, end) if len(data) != count*2-1 || from != start || to != end-frequency/2 || err != nil { t.Fatalf("c: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data) } - data, from, to, err = store.Read(Selector{{String: "test"}}, "d", start, end) + data, from, to, err = store.Read(types.Selector{{String: "test"}}, "d", start, end) if len(data) != count/3+1 || from != start || to != end+frequency*2 || err != nil { t.Errorf("expected: err=nil, from=%d, to=%d, len(data)=%d\n", start, end+frequency*2, count/3) t.Fatalf("d: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data) @@ -108,19 +110,19 @@ func TestMemoryStoreTooMuchWrites(t *testing.T) { func TestMemoryStoreOutOfBounds(t *testing.T) { count := 2000 toffset := 1000 - store := NewMemoryStore(map[string]MetricConfig{ + store := NewMemoryStore(map[string]types.MetricConfig{ "a": {Frequency: 60}, }) for i := 0; i < count; i++ { - if err := store.Write([]string{"cluster", "host", "cpu"}, int64(toffset+i*60), []Metric{ - {Name: "a", Value: Float(i)}, + if err := store.Write([]string{"cluster", "host", "cpu"}, int64(toffset+i*60), []types.Metric{ + {Name: "a", Value: types.Float(i)}, }); err != nil { t.Fatal(err) } } - sel := Selector{{String: "cluster"}, {String: "host"}, {String: "cpu"}} + sel := types.Selector{{String: "cluster"}, {String: "host"}, {String: "cpu"}} data, from, to, err := store.Read(sel, "a", 500, int64(toffset+count*60+500)) if err != nil { t.Fatal(err) @@ -130,9 +132,9 @@ func TestMemoryStoreOutOfBounds(t *testing.T) { t.Fatalf("Got %d-%d, expected %d-%d", from, to, toffset, toffset+count*60) } - if len(data) != count || data[0] != 0 || data[len(data)-1] != Float((count-1)) { + if len(data) != count || data[0] != 0 || data[len(data)-1] != types.Float((count-1)) { t.Fatalf("Wrong data (got: %d, %f, %f, expected: %d, %f, %f)", - len(data), data[0], data[len(data)-1], count, 0., Float(count-1)) + len(data), data[0], data[len(data)-1], count, 0., types.Float(count-1)) } testfrom, testlen := int64(100000000), int64(10000) @@ -150,7 +152,7 @@ func TestMemoryStoreOutOfBounds(t *testing.T) { func TestMemoryStoreMissingDatapoints(t *testing.T) { count := 3000 - store := NewMemoryStore(map[string]MetricConfig{ + store := NewMemoryStore(map[string]types.MetricConfig{ "a": {Frequency: 1}, }) @@ -159,8 +161,8 @@ func TestMemoryStoreMissingDatapoints(t *testing.T) { continue } - err := store.Write([]string{"testhost"}, int64(i), []Metric{ - {Name: "a", Value: Float(i)}, + err := store.Write([]string{"testhost"}, int64(i), []types.Metric{ + {Name: "a", Value: types.Float(i)}, }) if err != nil { t.Error(err) @@ -168,7 +170,7 @@ func TestMemoryStoreMissingDatapoints(t *testing.T) { } } - sel := Selector{{String: "testhost"}} + sel := types.Selector{{String: "testhost"}} adata, _, _, err := store.Read(sel, "a", 0, int64(count)) if err != nil { t.Error(err) @@ -182,7 +184,7 @@ func TestMemoryStoreMissingDatapoints(t *testing.T) { for i := 0; i < count-2; i++ { if i%3 == 0 { - if adata[i] != Float(i) { + if adata[i] != types.Float(i) { t.Error("unexpected value") return } @@ -197,21 +199,21 @@ func TestMemoryStoreMissingDatapoints(t *testing.T) { func TestMemoryStoreAggregation(t *testing.T) { count := 3000 - store := NewMemoryStore(map[string]MetricConfig{ - "a": {Frequency: 1, Aggregation: SumAggregation}, + store := NewMemoryStore(map[string]types.MetricConfig{ + "a": {Frequency: 1, Aggregation: types.SumAggregation}, }) for i := 0; i < count; i++ { - err := store.Write([]string{"host0", "cpu0"}, int64(i), []Metric{ - {Name: "a", Value: Float(i) / 2.}, + err := store.Write([]string{"host0", "cpu0"}, int64(i), []types.Metric{ + {Name: "a", Value: types.Float(i) / 2.}, }) if err != nil { t.Error(err) return } - err = store.Write([]string{"host0", "cpu1"}, int64(i), []Metric{ - {Name: "a", Value: Float(i) * 2.}, + err = store.Write([]string{"host0", "cpu1"}, int64(i), []types.Metric{ + {Name: "a", Value: types.Float(i) * 2.}, }) if err != nil { t.Error(err) @@ -219,7 +221,7 @@ func TestMemoryStoreAggregation(t *testing.T) { } } - adata, from, to, err := store.Read(Selector{{String: "host0"}}, "a", int64(0), int64(count)) + adata, from, to, err := store.Read(types.Selector{{String: "host0"}}, "a", int64(0), int64(count)) if err != nil { t.Error(err) return @@ -231,7 +233,7 @@ func TestMemoryStoreAggregation(t *testing.T) { } for i := 0; i < count; i++ { - expected := Float(i)/2. + Float(i)*2. + expected := types.Float(i)/2. + types.Float(i)*2. if adata[i] != expected { t.Errorf("expected: %f, got: %f", expected, adata[i]) return @@ -241,9 +243,9 @@ func TestMemoryStoreAggregation(t *testing.T) { func TestMemoryStoreStats(t *testing.T) { count := 3000 - store := NewMemoryStore(map[string]MetricConfig{ + store := NewMemoryStore(map[string]types.MetricConfig{ "a": {Frequency: 1}, - "b": {Frequency: 1, Aggregation: AvgAggregation}, + "b": {Frequency: 1, Aggregation: types.AvgAggregation}, }) sel1 := []string{"cluster", "host1"} @@ -270,18 +272,18 @@ func TestMemoryStoreStats(t *testing.T) { bmin = math.Min(bmin, b) bmax = math.Max(bmax, b) - store.Write(sel1, int64(i), []Metric{ - {Name: "a", Value: Float(a)}, + store.Write(sel1, int64(i), []types.Metric{ + {Name: "a", Value: types.Float(a)}, }) - store.Write(sel2, int64(i), []Metric{ - {Name: "b", Value: Float(b)}, + store.Write(sel2, int64(i), []types.Metric{ + {Name: "b", Value: types.Float(b)}, }) - store.Write(sel3, int64(i), []Metric{ - {Name: "b", Value: Float(b)}, + store.Write(sel3, int64(i), []types.Metric{ + {Name: "b", Value: types.Float(b)}, }) } - stats, from, to, err := store.Stats(Selector{{String: "cluster"}, {String: "host1"}}, "a", 0, int64(count)) + stats, from, to, err := store.Stats(types.Selector{{String: "cluster"}, {String: "host1"}}, "a", 0, int64(count)) if err != nil { t.Fatal(err) } @@ -290,11 +292,11 @@ func TestMemoryStoreStats(t *testing.T) { t.Fatalf("unexpected: from=%d, to=%d, stats.Samples=%d (expected samples=%d)\n", from, to, stats.Samples, samples) } - if stats.Avg != Float(asum/float64(samples)) || stats.Min != Float(amin) || stats.Max != Float(amax) { + if stats.Avg != types.Float(asum/float64(samples)) || stats.Min != types.Float(amin) || stats.Max != types.Float(amax) { t.Fatalf("wrong stats: %#v\n", stats) } - stats, from, to, err = store.Stats(Selector{{String: "cluster"}, {String: "host2"}}, "b", 0, int64(count)) + stats, from, to, err = store.Stats(types.Selector{{String: "cluster"}, {String: "host2"}}, "b", 0, int64(count)) if err != nil { t.Fatal(err) } @@ -303,22 +305,22 @@ func TestMemoryStoreStats(t *testing.T) { t.Fatalf("unexpected: from=%d, to=%d, stats.Samples=%d (expected samples=%d)\n", from, to, stats.Samples, samples*2) } - if stats.Avg != Float(bsum/float64(samples*2)) || stats.Min != Float(bmin) || stats.Max != Float(bmax) { + if stats.Avg != types.Float(bsum/float64(samples*2)) || stats.Min != types.Float(bmin) || stats.Max != types.Float(bmax) { t.Fatalf("wrong stats: %#v (expected: avg=%f, min=%f, max=%f)\n", stats, bsum/float64(samples*2), bmin, bmax) } } func TestMemoryStoreArchive(t *testing.T) { - store1 := NewMemoryStore(map[string]MetricConfig{ + store1 := NewMemoryStore(map[string]types.MetricConfig{ "a": {Frequency: 1}, "b": {Frequency: 1}, }) count := 2000 for i := 0; i < count; i++ { - err := store1.Write([]string{"cluster", "host", "cpu0"}, 100+int64(i), []Metric{ - {Name: "a", Value: Float(i)}, - {Name: "b", Value: Float(i * 2)}, + err := store1.Write([]string{"cluster", "host", "cpu0"}, 100+int64(i), []types.Metric{ + {Name: "a", Value: types.Float(i)}, + {Name: "b", Value: types.Float(i * 2)}, }) if err != nil { t.Error(err) @@ -329,29 +331,29 @@ func TestMemoryStoreArchive(t *testing.T) { // store1.DebugDump(bufio.NewWriter(os.Stdout)) archiveRoot := t.TempDir() - _, err := store1.ToCheckpoint(archiveRoot, 100, 100+int64(count/2)) + _, err := store1.ToJSONCheckpoint(archiveRoot, 100, 100+int64(count/2)) if err != nil { t.Error(err) return } - _, err = store1.ToCheckpoint(archiveRoot, 100+int64(count/2), 100+int64(count)) + _, err = store1.ToJSONCheckpoint(archiveRoot, 100+int64(count/2), 100+int64(count)) if err != nil { t.Error(err) return } - store2 := NewMemoryStore(map[string]MetricConfig{ + store2 := NewMemoryStore(map[string]types.MetricConfig{ "a": {Frequency: 1}, "b": {Frequency: 1}, }) - n, err := store2.FromCheckpoint(archiveRoot, 100) + n, err := store2.FromJSONCheckpoint(archiveRoot, 100) if err != nil { t.Error(err) return } - sel := Selector{{String: "cluster"}, {String: "host"}, {String: "cpu0"}} + sel := types.Selector{{String: "cluster"}, {String: "host"}, {String: "cpu0"}} adata, from, to, err := store2.Read(sel, "a", 100, int64(100+count)) if err != nil { t.Error(err) @@ -364,7 +366,7 @@ func TestMemoryStoreArchive(t *testing.T) { } for i := 0; i < count; i++ { - expected := Float(i) + expected := types.Float(i) if adata[i] != expected { t.Errorf("expected: %f, got: %f", expected, adata[i]) } @@ -372,7 +374,7 @@ func TestMemoryStoreArchive(t *testing.T) { } func TestMemoryStoreFree(t *testing.T) { - store := NewMemoryStore(map[string]MetricConfig{ + store := NewMemoryStore(map[string]types.MetricConfig{ "a": {Frequency: 1}, "b": {Frequency: 2}, }) @@ -380,44 +382,41 @@ func TestMemoryStoreFree(t *testing.T) { count := 3000 sel := []string{"cluster", "host", "1"} for i := 0; i < count; i++ { - err := store.Write(sel, int64(i), []Metric{ - {Name: "a", Value: Float(i)}, - {Name: "b", Value: Float(i)}, + err := store.Write(sel, int64(i), []types.Metric{ + {Name: "a", Value: types.Float(i)}, + {Name: "b", Value: types.Float(i)}, }) if err != nil { t.Fatal(err) } } - n, err := store.Free([]string{"cluster", "host"}, int64(BUFFER_CAP*2)+100) - if err != nil { - t.Fatal(err) - } + n := store.Free(int64(bufferSizeInFloats*2) + 100) if n != 3 { t.Fatal("two buffers expected to be released") } - adata, from, to, err := store.Read(Selector{{String: "cluster"}, {String: "host"}, {String: "1"}}, "a", 0, int64(count)) + adata, from, to, err := store.Read(types.Selector{{String: "cluster"}, {String: "host"}, {String: "1"}}, "a", 0, int64(count)) if err != nil { t.Fatal(err) } - if from != int64(BUFFER_CAP*2) || to != int64(count) || len(adata) != count-2*BUFFER_CAP { + if from != int64(bufferSizeInFloats*2) || to != int64(count) || len(adata) != count-2*bufferSizeInFloats { t.Fatalf("unexpected values from call to `Read`: from=%d, to=%d, len=%d", from, to, len(adata)) } - // bdata, from, to, err := store.Read(Selector{{String: "cluster"}, {String: "host"}, {String: "1"}}, "b", 0, int64(count)) + // bdata, from, to, err := store.Read(types.Selector{{String: "cluster"}, {String: "host"}, {String: "1"}}, "b", 0, int64(count)) // if err != nil { // t.Fatal(err) // } - // if from != int64(BUFFER_CAP*2) || to != int64(count) || len(bdata) != (count-2*BUFFER_CAP)/2 { + // if from != int64(bufferSizeInFloats*2) || to != int64(count) || len(bdata) != (count-2*bufferSizeInFloats)/2 { // t.Fatalf("unexpected values from call to `Read`: from=%d (expected: %d), to=%d (expected: %d), len=%d (expected: %d)", - // from, BUFFER_CAP*2, to, count, len(bdata), (count-2*BUFFER_CAP)/2) + // from, bufferSizeInFloats*2, to, count, len(bdata), (count-2*bufferSizeInFloats)/2) // } - if adata[0] != Float(BUFFER_CAP*2) || adata[len(adata)-1] != Float(count-1) { + if adata[0] != types.Float(bufferSizeInFloats*2) || adata[len(adata)-1] != types.Float(count-1) { t.Fatal("wrong values") } } @@ -426,7 +425,7 @@ func BenchmarkMemoryStoreConcurrentWrites(b *testing.B) { frequency := int64(5) count := b.N goroutines := 4 - store := NewMemoryStore(map[string]MetricConfig{ + store := NewMemoryStore(map[string]types.MetricConfig{ "a": {Frequency: frequency}, }) @@ -437,8 +436,8 @@ func BenchmarkMemoryStoreConcurrentWrites(b *testing.B) { go func(g int) { host := fmt.Sprintf("host%d", g) for i := 0; i < count; i++ { - store.Write([]string{"cluster", host, "cpu0"}, int64(i)*frequency, []Metric{ - {Name: "a", Value: Float(i)}, + store.Write([]string{"cluster", host, "cpu0"}, int64(i)*frequency, []types.Metric{ + {Name: "a", Value: types.Float(i)}, }) } wg.Done() @@ -450,7 +449,7 @@ func BenchmarkMemoryStoreConcurrentWrites(b *testing.B) { for g := 0; g < goroutines; g++ { host := fmt.Sprintf("host%d", g) - sel := Selector{{String: "cluster"}, {String: host}, {String: "cpu0"}} + sel := types.Selector{{String: "cluster"}, {String: host}, {String: "cpu0"}} adata, _, _, err := store.Read(sel, "a", 0, int64(count)*frequency) if err != nil { b.Error(err) @@ -463,7 +462,7 @@ func BenchmarkMemoryStoreConcurrentWrites(b *testing.B) { } for i := 0; i < count; i++ { - expected := Float(i) + expected := types.Float(i) if adata[i] != expected { b.Error("incorrect value for metric a") return @@ -475,23 +474,23 @@ func BenchmarkMemoryStoreConcurrentWrites(b *testing.B) { func BenchmarkMemoryStoreAggregation(b *testing.B) { b.StopTimer() count := 2000 - store := NewMemoryStore(map[string]MetricConfig{ - "flops_any": {Frequency: 1, Aggregation: AvgAggregation}, + store := NewMemoryStore(map[string]types.MetricConfig{ + "flops_any": {Frequency: 1, Aggregation: types.AvgAggregation}, }) sel := []string{"testcluster", "host123", "cpu0"} for i := 0; i < count; i++ { sel[2] = "cpu0" - err := store.Write(sel, int64(i), []Metric{ - {Name: "flops_any", Value: Float(i)}, + err := store.Write(sel, int64(i), []types.Metric{ + {Name: "flops_any", Value: types.Float(i)}, }) if err != nil { b.Fatal(err) } sel[2] = "cpu1" - err = store.Write(sel, int64(i), []Metric{ - {Name: "flops_any", Value: Float(i)}, + err = store.Write(sel, int64(i), []types.Metric{ + {Name: "flops_any", Value: types.Float(i)}, }) if err != nil { b.Fatal(err) @@ -500,7 +499,7 @@ func BenchmarkMemoryStoreAggregation(b *testing.B) { b.StartTimer() for n := 0; n < b.N; n++ { - data, from, to, err := store.Read(Selector{{String: "testcluster"}, {String: "host123"}}, "flops_any", 0, int64(count)) + data, from, to, err := store.Read(types.Selector{{String: "testcluster"}, {String: "host123"}}, "flops_any", 0, int64(count)) if err != nil { b.Fatal(err) }