From a85b210096f72990abf101e5cb2ddbb65bbd7fba Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 9 Jun 2021 06:03:31 +0200 Subject: [PATCH] Initial checkin --- README.md | 5 +- TODO.md | 3 + config.json | 15 ++ fileStore.go | 65 ++++++ fileStore_test.go | 7 + go.mod | 9 + go.sum | 57 +++++ memoryStore.go | 171 +++++++++++++++ memoryStore_test.go | 502 ++++++++++++++++++++++++++++++++++++++++++++ metric-store.go | 89 ++++++++ 10 files changed, 921 insertions(+), 2 deletions(-) create mode 100644 TODO.md create mode 100644 config.json create mode 100644 fileStore.go create mode 100644 fileStore_test.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 memoryStore.go create mode 100644 memoryStore_test.go create mode 100644 metric-store.go diff --git a/README.md b/README.md index c22d9d4..16f9052 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,3 @@ -# cc-metric-store -A simple in-memory metric store +# ClusterCockpit Metric Store + +FileStore is only a code fragment. To test the memoryStore move away the fileStore and run go test. diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..632c2d5 --- /dev/null +++ b/TODO.md @@ -0,0 +1,3 @@ +# Missing Testcases + + diff --git a/config.json b/config.json new file mode 100644 index 0000000..65e7105 --- /dev/null +++ b/config.json @@ -0,0 +1,15 @@ +{ + "root": "/home/jan/bla", + [ + { + "class": "N", + "frequency": 60, + "metrics": ["flops_any", "mem_bw"] + }, + { + "class": "C", + "frequency": 30, + "metrics": ["cpi", "flops_any"] + } + ] +} diff --git a/fileStore.go b/fileStore.go new file mode 100644 index 0000000..cf0a29d --- /dev/null +++ b/fileStore.go @@ -0,0 +1,65 @@ +package main + +import ( + "log" + "os" +) + +//MetricFile holds the state for a metric store file. +//It does not export any variable. +type FileStore struct { + metrics map[string]int + numMetrics int + size int64 + root string +} + +func getFileName(tp string, ts string, start int64) string { + +} + +func newFileStore(root string, size int64, o []string) { + var f FileStore + f.root = root + f.size = size + + for i, name := range o { + f.metrics[name] = i * f.size * 8 + } +} + +func openFile(fp string, hd *FileHeader) (f *File, err error) { + f, err = os.OpenFile(file, os.O_WRONLY, 0644) + + if err != nil { + return f, err + } + +} + +func createFile(fp string) (f *File, err error) { + f, err = os.Create(fp) + + if err != nil { + return f, err + } + +} + +func getFileHandle(file string, start int64) (f *File, err error) { + f, err = os.OpenFile(file, os.O_CREATE|os.O_WRONLY, 0644) + + if err != nil { + return f, err + } + + if _, err := f.Write([]byte("appended some data\n")); err != nil { + f.Close() // ignore error; Write error takes precedence + log.Fatal(err) + } + if err := f.Close(); err != nil { + log.Fatal(err) + } + + return f +} diff --git a/fileStore_test.go b/fileStore_test.go new file mode 100644 index 0000000..9e62fe3 --- /dev/null +++ b/fileStore_test.go @@ -0,0 +1,7 @@ +package main + +import "testing" + +func TestAddMetrics(t *testing.T) { + +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..c130b71 --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module github.com/ClusterCockpit/cc-metric-store + +go 1.16 + +require ( + github.com/golang/protobuf v1.5.2 // indirect + github.com/nats-io/nats-server/v2 v2.2.6 // indirect + github.com/nats-io/nats.go v1.11.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..128881f --- /dev/null +++ b/go.sum @@ -0,0 +1,57 @@ +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk= +github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= +github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= +github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= +github.com/nats-io/jwt/v2 v2.0.2 h1:ejVCLO8gu6/4bOKIHQpmB5UhhUJfAQw55yvLWpfmKjI= +github.com/nats-io/jwt/v2 v2.0.2/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= +github.com/nats-io/nats-server/v2 v2.2.6 h1:FPK9wWx9pagxcw14s8W9rlfzfyHm61uNLnJyybZbn48= +github.com/nats-io/nats-server/v2 v2.2.6/go.mod h1:sEnFaxqe09cDmfMgACxZbziXnhQFhwk+aKkZjBBRYrI= +github.com/nats-io/nats.go v1.11.0 h1:L263PZkrmkRJRJT2YHU8GwWWvEvmr9/LUKuJTXsF32k= +github.com/nats-io/nats.go v1.11.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= +golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= diff --git a/memoryStore.go b/memoryStore.go new file mode 100644 index 0000000..3d9798f --- /dev/null +++ b/memoryStore.go @@ -0,0 +1,171 @@ +package main + +import ( + "fmt" + "math" +) + +type storeBuffer struct { + store []float64 + start int64 +} + +type buffer struct { + current *storeBuffer + next *storeBuffer +} + +//MemoryStore holds the state for a metric memory store. +//It does not export any variable. +type MemoryStore struct { + containers map[string]*buffer + offsets map[string]int + frequency int + numSlots int + numMetrics int +} + +func initBuffer(b *storeBuffer) { + for i := 0; i < len(b.store); i++ { + b.store[i] = math.NaN() + } +} + +func allocateBuffer(ts int64, size int) *buffer { + b := new(buffer) + s := make([]float64, size) + b.current = &storeBuffer{s, ts} + initBuffer(b.current) + + s = make([]float64, size) + b.next = &storeBuffer{s, 0} + initBuffer(b.next) + return b +} + +func switchBuffers(m *MemoryStore, b *buffer) { + initBuffer(b.next) + b.current, b.next = b.next, b.current + b.current.start = b.next.start + int64(m.numSlots*m.frequency) +} + +func newMemoryStore(o []string, n int, f int) *MemoryStore { + var m MemoryStore + m.frequency = f + m.numSlots = n + m.containers = make(map[string]*buffer) + m.offsets = make(map[string]int) + + for i, name := range o { + m.offsets[name] = i + } + + m.numMetrics = len(o) + + return &m +} + +// AddMetrics writes metrics to the memoryStore for entity key +// at unix epoch time ts. The unit of ts is seconds. +// An error is returned if ts is out of bounds of MemoryStore. +func (m *MemoryStore) AddMetrics( + key string, + ts int64, + metrics []Metric) error { + + b, ok := m.containers[key] + + if !ok { + //Key does not exist. Allocate new buffer. + m.containers[key] = allocateBuffer(ts, m.numMetrics*m.numSlots) + b = m.containers[key] + } + + index := int(ts-b.current.start) / m.frequency + + if index < 0 || index >= 2*m.numSlots { + return fmt.Errorf("ts %d out of bounds", ts) + } + + if index >= m.numSlots { + //Index exceeds buffer length. Switch buffers. + switchBuffers(m, b) + index = int(ts-b.current.start) / m.frequency + } + + s := b.current.store + + for _, metric := range metrics { + s[m.offsets[metric.Name]*m.numSlots+index] = metric.Value + } + + return nil +} + +// GetMetric returns a slize with metric values for timerange +// and entity key. Returns an error if key does not exist, +// stop is before start or start is in the future. +func (m *MemoryStore) GetMetric( + key string, + metric string, + from int64, + to int64) ([]float64, int64, error) { + + b, ok := m.containers[key] + + if !ok { + return nil, 0, fmt.Errorf("key %s does not exist", key) + } + + if to <= from { + return nil, 0, fmt.Errorf("invalid duration %d - %d", from, to) + } + + if from > b.current.start+int64(m.numSlots*m.frequency) { + return nil, 0, fmt.Errorf("from %d out of bounds", from) + } + + if to < b.next.start { + return nil, 0, fmt.Errorf("to %d out of bounds", to) + } + + var values1, values2 []float64 + offset := m.offsets[metric] + valuesFrom := from + + if from < b.current.start { + + var start, stop = 0, m.numSlots + + if from > b.next.start { + start = int(from-b.next.start) / m.frequency + } else { + valuesFrom = b.next.start + } + + if to < b.current.start { + stop = int(to-b.next.start) / m.frequency + } + + // fmt.Println("NEXT", start, stop) + values1 = b.next.store[offset+start : offset+stop] + } + + if to >= b.current.start { + + var start, stop = 0, m.numSlots + + if from > b.current.start { + start = int(from-b.current.start) / m.frequency + } + + if to <= b.current.start+int64(m.numSlots*m.frequency) { + stop = int(to-b.current.start) / m.frequency + } + + // fmt.Println("CURRENT", start, stop, b.current.start) + values2 = b.current.store[offset+start : offset+stop] + } + + return append(values1, values2...), valuesFrom, nil +} diff --git a/memoryStore_test.go b/memoryStore_test.go new file mode 100644 index 0000000..1602c8d --- /dev/null +++ b/memoryStore_test.go @@ -0,0 +1,502 @@ +package main + +import ( + "fmt" + "log" + "math" + "testing" +) + +var testMetrics [][]Metric = [][]Metric{ + {{"flops", 100.5}, {"mem_bw", 2088.67}}, + {{"flops", 180.5}, {"mem_bw", 4078.32}, {"mem_capacity", 1020}}, + {{"flops", 980.5}, {"mem_bw", 9078.32}, {"mem_capacity", 5010}}, + {{"flops", 940.5}, {"mem_bw", 9278.32}, {"mem_capacity", 6010}}, + {{"flops", 930.5}, {"mem_bw", 9378.32}, {"mem_capacity", 7010}}, + {{"flops", 980.5}, {"mem_bw", 9478.32}, {"mem_capacity", 8010}}, + {{"flops", 980.5}, {"mem_bw", 9478.32}, {"mem_capacity", 8010}}, + {{"flops", 980.5}, {"mem_bw", 9478.32}, {"mem_capacity", 8010}}, + {{"flops", 970.5}, {"mem_bw", 9178.32}, {"mem_capacity", 2010}}, + {{"flops", 970.5}, {"mem_bw", 9178.32}, {"mem_capacity", 2010}}} + +var testMetricsAlt [][]Metric = [][]Metric{ + {{"flops", 120.5}, {"mem_bw", 2080.67}}, + {{"flops", 130.5}, {"mem_bw", 4071.32}, {"mem_capacity", 1120}}, + {{"flops", 940.5}, {"mem_bw", 9072.32}, {"mem_capacity", 5210}}, + {{"flops", 950.5}, {"mem_bw", 9273.32}, {"mem_capacity", 6310}}, + {{"flops", 960.5}, {"mem_bw", 9374.32}, {"mem_capacity", 7410}}, + {{"flops", 970.5}, {"mem_bw", 9475.32}, {"mem_capacity", 8510}}, + {{"flops", 990.5}, {"mem_bw", 9476.32}, {"mem_capacity", 8610}}, + {{"flops", 910.5}, {"mem_bw", 9477.32}, {"mem_capacity", 8710}}, + {{"flops", 920.5}, {"mem_bw", 9178.32}, {"mem_capacity", 2810}}, + {{"flops", 930.5}, {"mem_bw", 9179.32}, {"mem_capacity", 2910}}} + +func dumpStoreBuffer(s *storeBuffer) { + log.Printf("Start TS %d\n", s.start) + ctr := 0 + + for _, val := range s.store { + fmt.Printf("%f\t", val) + ctr++ + + if ctr == 10 { + fmt.Printf("\n") + ctr = 0 + } + } +} + +func printMemStore(m *MemoryStore) { + log.Println("########################") + log.Printf("Frequency %d, Metrics %d Slots %d\n", + m.frequency, m.numMetrics, m.numSlots) + log.Println("##Offsets") + for key, val := range m.offsets { + log.Printf("\t%s = %d\n", key, val) + } + log.Println("##Containers") + for key, c := range m.containers { + log.Printf("ID %s\n", key) + log.Println("###current") + dumpStoreBuffer(c.current) + log.Println("###next") + dumpStoreBuffer(c.next) + } + log.Println("########################") +} + +//############################ +//#### Whitebox tests ######## +//############################ +func TestAddMetricSimple(t *testing.T) { + key := "m1220" + m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) + // printMemStore(m) + + m.AddMetrics(key, 1584022800, testMetrics[0]) + m.AddMetrics(key, 1584022890, testMetrics[1]) + + want := testMetrics[0][0].Value + got := m.containers[key].current.store[0] + if got != want { + t.Errorf("Want %f got %f\n", want, got) + } + + want = testMetrics[1][2].Value + got = m.containers[key].current.store[21] + if got != want { + t.Errorf("Want %f got %f\n", want, got) + } + // printMemStore(m) +} + +func TestAddMetricReplace(t *testing.T) { + key := "m1220" + m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) + // printMemStore(m) + + m.AddMetrics(key, 1584022800, testMetrics[0]) + m.AddMetrics(key, 1584022800, testMetrics[1]) + + want := testMetrics[1][0].Value + got := m.containers[key].current.store[0] + if got != want { + t.Errorf("Want %f got %f\n", want, got) + } + + m.AddMetrics(key, 1584022850, testMetrics[0]) + want = testMetrics[0][0].Value + got = m.containers[key].current.store[0] + if got != want { + t.Errorf("Want %f got %f\n", want, got) + } + + m.AddMetrics(key, 1584022860, testMetrics[1]) + want = testMetrics[0][0].Value + got = m.containers[key].current.store[0] + if got != want { + t.Errorf("Want %f got %f\n", want, got) + } + // printMemStore(m) +} + +func TestAddMetricSwitch(t *testing.T) { + key := "m1220" + m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) + // printMemStore(m) + + m.AddMetrics(key, 1584023000, testMetrics[0]) + m.AddMetrics(key, 1584023580, testMetrics[1]) + + want := testMetrics[1][2].Value + got := m.containers[key].current.store[29] + if got != want { + t.Errorf("Want %f got %f\n", want, got) + } + + m.AddMetrics(key, 1584023600, testMetrics[2]) + want = testMetrics[2][2].Value + got = m.containers[key].current.store[20] + if got != want { + t.Errorf("Want %f got %f\n", want, got) + } + + // printMemStore(m) +} + +//############################ +//#### Blackbox tests ######## +//############################ + +func TestAddMetricOutOfBounds(t *testing.T) { + key := "m1220" + m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 30, 60) + + err := m.AddMetrics(key, 1584023000, testMetrics[0]) + if err != nil { + t.Errorf("Got error 1584023000\n") + } + err = m.AddMetrics(key, 1584026600, testMetrics[0]) + if err == nil { + t.Errorf("Got no error 1584026600\n") + } + err = m.AddMetrics(key, 1584021580, testMetrics[1]) + if err == nil { + t.Errorf("Got no error 1584021580\n") + } + err = m.AddMetrics(key, 1584024580, testMetrics[1]) + if err != nil { + t.Errorf("Got error 1584024580\n") + } + err = m.AddMetrics(key, 1584091580, testMetrics[1]) + if err == nil { + t.Errorf("Got no error 1584091580\n") + } + err = m.AddMetrics(key, 1584024780, testMetrics[0]) + if err != nil { + t.Errorf("Got error 1584024780\n") + } +} + +func TestGetMetricPlainCurrent(t *testing.T) { + key := "m1220" + m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) + + for i := 0; i < len(testMetrics); i++ { + m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i]) + } + + // printMemStore(m) + val, tsFrom, err := m.GetMetric(key, "flops", 1584023000, 1584023560) + + if err != nil { + t.Errorf("Got error\n") + } + if tsFrom != 1584023000 { + t.Errorf("Start ts differs: %d\n", tsFrom) + } + if len(val) != 9 { + t.Errorf("Want 9. Got %d\n", len(val)) + } + if val[0] != 100.5 { + t.Errorf("Want 100.5 Got %f\n", val[0]) + } + if val[8] != 970.5 { + t.Errorf("Want 970.5 Got %f\n", val[9]) + } +} + +func TestGetMetricPlainNext(t *testing.T) { + key := "m1220" + m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) + + for i := 0; i < len(testMetrics); i++ { + m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i]) + } + for i := 0; i < len(testMetrics); i++ { + m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i]) + } + + // printMemStore(m) + val, tsFrom, err := m.GetMetric(key, "flops", 1584023000, 1584023560) + + if err != nil { + t.Errorf("Got error\n") + } + if tsFrom != 1584023000 { + t.Errorf("Start ts differs: %d\n", tsFrom) + } + if len(val) != 9 { + t.Errorf("Want 9. Got %d\n", len(val)) + } + if val[0] != 100.5 { + t.Errorf("Want 100.5 Got %f\n", val[0]) + } + if val[8] != 970.5 { + t.Errorf("Want 970.5 Got %f\n", val[9]) + } +} + +func TestGetMetricGap(t *testing.T) { + key := "m1220" + m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) + + for i := 0; i < len(testMetrics); i++ { + m.AddMetrics(key, int64(1584023000+i*120), testMetrics[i]) + } + + val, tsFrom, err := m.GetMetric(key, "flops", 1584023000, 1584023600) + + if err != nil { + t.Errorf("Got error\n") + } + if tsFrom != 1584023000 { + t.Errorf("Start ts differs: %d\n", tsFrom) + } + if len(val) != 10 { + t.Errorf("Want 10. Got %d\n", len(val)) + } + if val[0] != 100.5 { + t.Errorf("Want 100.5 Got %f\n", val[0]) + } + if !math.IsNaN(val[1]) { + t.Errorf("Want NaN Got %f\n", val[1]) + } + if val[0] != 100.5 { + t.Errorf("Want 100.5 Got %f\n", val[0]) + } + + // fmt.Println(val) +} + +func TestGetMetricSplit(t *testing.T) { + key := "m1220" + m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) + + for i := 0; i < len(testMetrics); i++ { + m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i]) + } + for i := 0; i < len(testMetrics); i++ { + m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i]) + } + + // printMemStore(m) + + val, tsFrom, err := m.GetMetric(key, "flops", 1584023200, 1584023860) + + if err != nil { + t.Errorf("Got error\n") + } + if tsFrom != 1584023200 { + t.Errorf("Start ts differs: %d\n", tsFrom) + } + if len(val) != 11 { + t.Errorf("Want 11. Got %d\n", len(val)) + } + if val[0] != 940.5 { + t.Errorf("Want 940.5 Got %f\n", val[0]) + } + if val[10] != 950.5 { + t.Errorf("Want 950.5 Got %f\n", val[0]) + } +} + +func TestGetMetricExceedNext(t *testing.T) { + key := "m1220" + m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) + + for i := 0; i < len(testMetrics); i++ { + m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i]) + } + for i := 0; i < len(testMetrics); i++ { + m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i]) + } + + // printMemStore(m) + + val, tsFrom, err := m.GetMetric(key, "flops", 1584022800, 1584023400) + + if err != nil { + t.Errorf("Got error\n") + } + if tsFrom != 1584023000 { + t.Errorf("Start ts differs: %d\n", tsFrom) + } + if len(val) != 6 { + t.Errorf("Want 6. Got %d\n", len(val)) + } + if val[0] != 100.5 { + t.Errorf("Want 100.5 Got %f\n", val[0]) + } + if val[5] != 980.5 { + t.Errorf("Want 980.5 Got %f\n", val[5]) + } +} + +func TestGetMetricExceedNextSplit(t *testing.T) { + key := "m1220" + m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) + + for i := 0; i < len(testMetrics); i++ { + m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i]) + } + for i := 0; i < len(testMetrics); i++ { + m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i]) + } + + // printMemStore(m) + + val, tsFrom, err := m.GetMetric(key, "flops", 1584022800, 1584023900) + + if err != nil { + t.Errorf("Got error\n") + } + if tsFrom != 1584023000 { + t.Errorf("Start ts differs: %d\n", tsFrom) + } + if len(val) != 15 { + t.Errorf("Want 14. Got %d\n", len(val)) + } + if val[0] != 100.5 { + t.Errorf("Want 100.5 Got %f\n", val[0]) + } + if val[14] != 960.5 { + t.Errorf("Want 960.5 Got %f\n", val[13]) + } +} + +func TestGetMetricExceedCurrent(t *testing.T) { + key := "m1220" + m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) + + for i := 0; i < len(testMetrics); i++ { + m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i]) + } + for i := 0; i < len(testMetrics); i++ { + m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i]) + } + + // printMemStore(m) + + val, tsFrom, err := m.GetMetric(key, "flops", 1584023800, 1584027900) + + if err != nil { + t.Errorf("Got error\n") + } + if tsFrom != 1584023800 { + t.Errorf("Start ts differs: %d\n", tsFrom) + } + if len(val) != 7 { + t.Errorf("Want 6. Got %d\n", len(val)) + } + if val[0] != 950.5 { + t.Errorf("Want 950.5 Got %f\n", val[0]) + } + if val[6] != 930.5 { + t.Errorf("Want 930.5 Got %f\n", val[5]) + } +} + +func TestGetMetricExceedCurrentSplit(t *testing.T) { + key := "m1220" + m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) + + for i := 0; i < len(testMetrics); i++ { + m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i]) + } + for i := 0; i < len(testMetrics); i++ { + m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i]) + } + + // printMemStore(m) + + val, tsFrom, err := m.GetMetric(key, "flops", 1584023120, 1584027900) + + if err != nil { + t.Errorf("Got error\n") + } + if tsFrom != 1584023120 { + t.Errorf("Start ts differs: %d\n", tsFrom) + } + if len(val) != 18 { + t.Errorf("Want 18. Got %d\n", len(val)) + } + if val[0] != 980.5 { + t.Errorf("Want 950.5 Got %f\n", val[0]) + } + if val[17] != 930.5 { + t.Errorf("Want 930.5 Got %f\n", val[17]) + } +} + +func TestGetMetricExceedBoth(t *testing.T) { + key := "m1220" + m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) + + for i := 0; i < len(testMetrics); i++ { + m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i]) + } + for i := 0; i < len(testMetrics); i++ { + m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i]) + } + + // printMemStore(m) + + val, tsFrom, err := m.GetMetric(key, "flops", 1584022800, 1584027900) + + if err != nil { + t.Errorf("Got error\n") + } + if tsFrom != 1584023000 { + t.Errorf("Start ts differs: %d\n", tsFrom) + } + if len(val) != 20 { + t.Errorf("Want 20. Got %d\n", len(val)) + } + if val[0] != 100.5 { + t.Errorf("Want 950.5 Got %f\n", val[0]) + } + if val[19] != 930.5 { + t.Errorf("Want 930.5 Got %f\n", val[17]) + } +} + +func TestGetMetricOutUpper(t *testing.T) { + key := "m1220" + m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) + + for i := 0; i < len(testMetrics); i++ { + m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i]) + } + for i := 0; i < len(testMetrics); i++ { + m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i]) + } + + // printMemStore(m) + + _, _, err := m.GetMetric(key, "flops", 1584032800, 1584037900) + + if err == nil { + t.Errorf("Got no error\n") + } +} + +func TestGetMetricOutLower(t *testing.T) { + key := "m1220" + m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60) + + for i := 0; i < len(testMetrics); i++ { + m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i]) + } + for i := 0; i < len(testMetrics); i++ { + m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i]) + } + + // printMemStore(m) + + _, _, err := m.GetMetric(key, "flops", 1584002800, 1584007900) + + if err == nil { + t.Errorf("Got no error\n") + } +} diff --git a/metric-store.go b/metric-store.go new file mode 100644 index 0000000..a83d0e2 --- /dev/null +++ b/metric-store.go @@ -0,0 +1,89 @@ +package main + +import ( + "bytes" + "encoding/gob" + "encoding/json" + "fmt" + "log" + "os" + "sync" + + nats "github.com/nats-io/nats.go" +) + +type Config struct { + MemoryStore struct { + Duration string `json:"duration"` + } `json:"memory_store"` + FileStore struct { + Duration string `json:"duration"` + } `json:"file_store"` + Root string `json:"root"` + Frequency int `json:"frequency"` + Metrics []string `json:"metrics"` +} + +type MetricData struct { + Name string + Values []float64 +} + +type Metric struct { + Name string + Value float64 +} + +type message struct { + Ts int64 + Tags []string + Fields []Metric +} + +var Conf Config + +func loadConfiguration(file string) Config { + var config Config + configFile, err := os.Open(file) + defer configFile.Close() + if err != nil { + fmt.Println(err.Error()) + } + jsonParser := json.NewDecoder(configFile) + jsonParser.Decode(&config) + return config +} + +func main() { + + Conf = loadConfiguration("config.json") + + // Connect to a server + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + var msgBuffer bytes.Buffer + dec := gob.NewDecoder(&msgBuffer) + + // Use a WaitGroup to wait for a message to arrive + wg := sync.WaitGroup{} + wg.Add(1) + + // Subscribe + if _, err := nc.Subscribe("updates", func(m *nats.Msg) { + log.Println(m.Subject) + var p message + err = dec.Decode(&p) + if err != nil { + log.Fatal("decode error 1:", err) + } + }); err != nil { + log.Fatal(err) + } + + // Wait for a message to come in + wg.Wait() +}