mirror of
https://github.com/ClusterCockpit/cc-metric-store.git
synced 2024-11-10 05:07:25 +01:00
Initial checkin
This commit is contained in:
parent
a59049d24e
commit
a85b210096
@ -1,2 +1,3 @@
|
||||
# cc-metric-store
|
||||
A simple in-memory metric store
|
||||
# ClusterCockpit Metric Store
|
||||
|
||||
FileStore is only a code fragment. To test the memoryStore move away the fileStore and run go test.
|
||||
|
15
config.json
Normal file
15
config.json
Normal file
@ -0,0 +1,15 @@
|
||||
{
|
||||
"root": "/home/jan/bla",
|
||||
[
|
||||
{
|
||||
"class": "N",
|
||||
"frequency": 60,
|
||||
"metrics": ["flops_any", "mem_bw"]
|
||||
},
|
||||
{
|
||||
"class": "C",
|
||||
"frequency": 30,
|
||||
"metrics": ["cpi", "flops_any"]
|
||||
}
|
||||
]
|
||||
}
|
65
fileStore.go
Normal file
65
fileStore.go
Normal file
@ -0,0 +1,65 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
)
|
||||
|
||||
//MetricFile holds the state for a metric store file.
|
||||
//It does not export any variable.
|
||||
type FileStore struct {
|
||||
metrics map[string]int
|
||||
numMetrics int
|
||||
size int64
|
||||
root string
|
||||
}
|
||||
|
||||
func getFileName(tp string, ts string, start int64) string {
|
||||
|
||||
}
|
||||
|
||||
func newFileStore(root string, size int64, o []string) {
|
||||
var f FileStore
|
||||
f.root = root
|
||||
f.size = size
|
||||
|
||||
for i, name := range o {
|
||||
f.metrics[name] = i * f.size * 8
|
||||
}
|
||||
}
|
||||
|
||||
func openFile(fp string, hd *FileHeader) (f *File, err error) {
|
||||
f, err = os.OpenFile(file, os.O_WRONLY, 0644)
|
||||
|
||||
if err != nil {
|
||||
return f, err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func createFile(fp string) (f *File, err error) {
|
||||
f, err = os.Create(fp)
|
||||
|
||||
if err != nil {
|
||||
return f, err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func getFileHandle(file string, start int64) (f *File, err error) {
|
||||
f, err = os.OpenFile(file, os.O_CREATE|os.O_WRONLY, 0644)
|
||||
|
||||
if err != nil {
|
||||
return f, err
|
||||
}
|
||||
|
||||
if _, err := f.Write([]byte("appended some data\n")); err != nil {
|
||||
f.Close() // ignore error; Write error takes precedence
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
return f
|
||||
}
|
7
fileStore_test.go
Normal file
7
fileStore_test.go
Normal file
@ -0,0 +1,7 @@
|
||||
package main
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestAddMetrics(t *testing.T) {
|
||||
|
||||
}
|
9
go.mod
Normal file
9
go.mod
Normal file
@ -0,0 +1,9 @@
|
||||
module github.com/ClusterCockpit/cc-metric-store
|
||||
|
||||
go 1.16
|
||||
|
||||
require (
|
||||
github.com/golang/protobuf v1.5.2 // indirect
|
||||
github.com/nats-io/nats-server/v2 v2.2.6 // indirect
|
||||
github.com/nats-io/nats.go v1.11.0
|
||||
)
|
57
go.sum
Normal file
57
go.sum
Normal file
@ -0,0 +1,57 @@
|
||||
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
|
||||
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
|
||||
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
|
||||
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
|
||||
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
|
||||
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
|
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
|
||||
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk=
|
||||
github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
|
||||
github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0=
|
||||
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
|
||||
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
|
||||
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
|
||||
github.com/nats-io/jwt/v2 v2.0.2 h1:ejVCLO8gu6/4bOKIHQpmB5UhhUJfAQw55yvLWpfmKjI=
|
||||
github.com/nats-io/jwt/v2 v2.0.2/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
|
||||
github.com/nats-io/nats-server/v2 v2.2.6 h1:FPK9wWx9pagxcw14s8W9rlfzfyHm61uNLnJyybZbn48=
|
||||
github.com/nats-io/nats-server/v2 v2.2.6/go.mod h1:sEnFaxqe09cDmfMgACxZbziXnhQFhwk+aKkZjBBRYrI=
|
||||
github.com/nats-io/nats.go v1.11.0 h1:L263PZkrmkRJRJT2YHU8GwWWvEvmr9/LUKuJTXsF32k=
|
||||
github.com/nats-io/nats.go v1.11.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
|
||||
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
|
||||
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as=
|
||||
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
|
||||
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
||||
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
|
||||
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
|
||||
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
|
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
|
||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
171
memoryStore.go
Normal file
171
memoryStore.go
Normal file
@ -0,0 +1,171 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
)
|
||||
|
||||
type storeBuffer struct {
|
||||
store []float64
|
||||
start int64
|
||||
}
|
||||
|
||||
type buffer struct {
|
||||
current *storeBuffer
|
||||
next *storeBuffer
|
||||
}
|
||||
|
||||
//MemoryStore holds the state for a metric memory store.
|
||||
//It does not export any variable.
|
||||
type MemoryStore struct {
|
||||
containers map[string]*buffer
|
||||
offsets map[string]int
|
||||
frequency int
|
||||
numSlots int
|
||||
numMetrics int
|
||||
}
|
||||
|
||||
func initBuffer(b *storeBuffer) {
|
||||
for i := 0; i < len(b.store); i++ {
|
||||
b.store[i] = math.NaN()
|
||||
}
|
||||
}
|
||||
|
||||
func allocateBuffer(ts int64, size int) *buffer {
|
||||
b := new(buffer)
|
||||
s := make([]float64, size)
|
||||
b.current = &storeBuffer{s, ts}
|
||||
initBuffer(b.current)
|
||||
|
||||
s = make([]float64, size)
|
||||
b.next = &storeBuffer{s, 0}
|
||||
initBuffer(b.next)
|
||||
return b
|
||||
}
|
||||
|
||||
func switchBuffers(m *MemoryStore, b *buffer) {
|
||||
initBuffer(b.next)
|
||||
b.current, b.next = b.next, b.current
|
||||
b.current.start = b.next.start + int64(m.numSlots*m.frequency)
|
||||
}
|
||||
|
||||
func newMemoryStore(o []string, n int, f int) *MemoryStore {
|
||||
var m MemoryStore
|
||||
m.frequency = f
|
||||
m.numSlots = n
|
||||
m.containers = make(map[string]*buffer)
|
||||
m.offsets = make(map[string]int)
|
||||
|
||||
for i, name := range o {
|
||||
m.offsets[name] = i
|
||||
}
|
||||
|
||||
m.numMetrics = len(o)
|
||||
|
||||
return &m
|
||||
}
|
||||
|
||||
// AddMetrics writes metrics to the memoryStore for entity key
|
||||
// at unix epoch time ts. The unit of ts is seconds.
|
||||
// An error is returned if ts is out of bounds of MemoryStore.
|
||||
func (m *MemoryStore) AddMetrics(
|
||||
key string,
|
||||
ts int64,
|
||||
metrics []Metric) error {
|
||||
|
||||
b, ok := m.containers[key]
|
||||
|
||||
if !ok {
|
||||
//Key does not exist. Allocate new buffer.
|
||||
m.containers[key] = allocateBuffer(ts, m.numMetrics*m.numSlots)
|
||||
b = m.containers[key]
|
||||
}
|
||||
|
||||
index := int(ts-b.current.start) / m.frequency
|
||||
|
||||
if index < 0 || index >= 2*m.numSlots {
|
||||
return fmt.Errorf("ts %d out of bounds", ts)
|
||||
}
|
||||
|
||||
if index >= m.numSlots {
|
||||
//Index exceeds buffer length. Switch buffers.
|
||||
switchBuffers(m, b)
|
||||
index = int(ts-b.current.start) / m.frequency
|
||||
}
|
||||
|
||||
s := b.current.store
|
||||
|
||||
for _, metric := range metrics {
|
||||
s[m.offsets[metric.Name]*m.numSlots+index] = metric.Value
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetMetric returns a slize with metric values for timerange
|
||||
// and entity key. Returns an error if key does not exist,
|
||||
// stop is before start or start is in the future.
|
||||
func (m *MemoryStore) GetMetric(
|
||||
key string,
|
||||
metric string,
|
||||
from int64,
|
||||
to int64) ([]float64, int64, error) {
|
||||
|
||||
b, ok := m.containers[key]
|
||||
|
||||
if !ok {
|
||||
return nil, 0, fmt.Errorf("key %s does not exist", key)
|
||||
}
|
||||
|
||||
if to <= from {
|
||||
return nil, 0, fmt.Errorf("invalid duration %d - %d", from, to)
|
||||
}
|
||||
|
||||
if from > b.current.start+int64(m.numSlots*m.frequency) {
|
||||
return nil, 0, fmt.Errorf("from %d out of bounds", from)
|
||||
}
|
||||
|
||||
if to < b.next.start {
|
||||
return nil, 0, fmt.Errorf("to %d out of bounds", to)
|
||||
}
|
||||
|
||||
var values1, values2 []float64
|
||||
offset := m.offsets[metric]
|
||||
valuesFrom := from
|
||||
|
||||
if from < b.current.start {
|
||||
|
||||
var start, stop = 0, m.numSlots
|
||||
|
||||
if from > b.next.start {
|
||||
start = int(from-b.next.start) / m.frequency
|
||||
} else {
|
||||
valuesFrom = b.next.start
|
||||
}
|
||||
|
||||
if to < b.current.start {
|
||||
stop = int(to-b.next.start) / m.frequency
|
||||
}
|
||||
|
||||
// fmt.Println("NEXT", start, stop)
|
||||
values1 = b.next.store[offset+start : offset+stop]
|
||||
}
|
||||
|
||||
if to >= b.current.start {
|
||||
|
||||
var start, stop = 0, m.numSlots
|
||||
|
||||
if from > b.current.start {
|
||||
start = int(from-b.current.start) / m.frequency
|
||||
}
|
||||
|
||||
if to <= b.current.start+int64(m.numSlots*m.frequency) {
|
||||
stop = int(to-b.current.start) / m.frequency
|
||||
}
|
||||
|
||||
// fmt.Println("CURRENT", start, stop, b.current.start)
|
||||
values2 = b.current.store[offset+start : offset+stop]
|
||||
}
|
||||
|
||||
return append(values1, values2...), valuesFrom, nil
|
||||
}
|
502
memoryStore_test.go
Normal file
502
memoryStore_test.go
Normal file
@ -0,0 +1,502 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var testMetrics [][]Metric = [][]Metric{
|
||||
{{"flops", 100.5}, {"mem_bw", 2088.67}},
|
||||
{{"flops", 180.5}, {"mem_bw", 4078.32}, {"mem_capacity", 1020}},
|
||||
{{"flops", 980.5}, {"mem_bw", 9078.32}, {"mem_capacity", 5010}},
|
||||
{{"flops", 940.5}, {"mem_bw", 9278.32}, {"mem_capacity", 6010}},
|
||||
{{"flops", 930.5}, {"mem_bw", 9378.32}, {"mem_capacity", 7010}},
|
||||
{{"flops", 980.5}, {"mem_bw", 9478.32}, {"mem_capacity", 8010}},
|
||||
{{"flops", 980.5}, {"mem_bw", 9478.32}, {"mem_capacity", 8010}},
|
||||
{{"flops", 980.5}, {"mem_bw", 9478.32}, {"mem_capacity", 8010}},
|
||||
{{"flops", 970.5}, {"mem_bw", 9178.32}, {"mem_capacity", 2010}},
|
||||
{{"flops", 970.5}, {"mem_bw", 9178.32}, {"mem_capacity", 2010}}}
|
||||
|
||||
var testMetricsAlt [][]Metric = [][]Metric{
|
||||
{{"flops", 120.5}, {"mem_bw", 2080.67}},
|
||||
{{"flops", 130.5}, {"mem_bw", 4071.32}, {"mem_capacity", 1120}},
|
||||
{{"flops", 940.5}, {"mem_bw", 9072.32}, {"mem_capacity", 5210}},
|
||||
{{"flops", 950.5}, {"mem_bw", 9273.32}, {"mem_capacity", 6310}},
|
||||
{{"flops", 960.5}, {"mem_bw", 9374.32}, {"mem_capacity", 7410}},
|
||||
{{"flops", 970.5}, {"mem_bw", 9475.32}, {"mem_capacity", 8510}},
|
||||
{{"flops", 990.5}, {"mem_bw", 9476.32}, {"mem_capacity", 8610}},
|
||||
{{"flops", 910.5}, {"mem_bw", 9477.32}, {"mem_capacity", 8710}},
|
||||
{{"flops", 920.5}, {"mem_bw", 9178.32}, {"mem_capacity", 2810}},
|
||||
{{"flops", 930.5}, {"mem_bw", 9179.32}, {"mem_capacity", 2910}}}
|
||||
|
||||
func dumpStoreBuffer(s *storeBuffer) {
|
||||
log.Printf("Start TS %d\n", s.start)
|
||||
ctr := 0
|
||||
|
||||
for _, val := range s.store {
|
||||
fmt.Printf("%f\t", val)
|
||||
ctr++
|
||||
|
||||
if ctr == 10 {
|
||||
fmt.Printf("\n")
|
||||
ctr = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func printMemStore(m *MemoryStore) {
|
||||
log.Println("########################")
|
||||
log.Printf("Frequency %d, Metrics %d Slots %d\n",
|
||||
m.frequency, m.numMetrics, m.numSlots)
|
||||
log.Println("##Offsets")
|
||||
for key, val := range m.offsets {
|
||||
log.Printf("\t%s = %d\n", key, val)
|
||||
}
|
||||
log.Println("##Containers")
|
||||
for key, c := range m.containers {
|
||||
log.Printf("ID %s\n", key)
|
||||
log.Println("###current")
|
||||
dumpStoreBuffer(c.current)
|
||||
log.Println("###next")
|
||||
dumpStoreBuffer(c.next)
|
||||
}
|
||||
log.Println("########################")
|
||||
}
|
||||
|
||||
//############################
|
||||
//#### Whitebox tests ########
|
||||
//############################
|
||||
func TestAddMetricSimple(t *testing.T) {
|
||||
key := "m1220"
|
||||
m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60)
|
||||
// printMemStore(m)
|
||||
|
||||
m.AddMetrics(key, 1584022800, testMetrics[0])
|
||||
m.AddMetrics(key, 1584022890, testMetrics[1])
|
||||
|
||||
want := testMetrics[0][0].Value
|
||||
got := m.containers[key].current.store[0]
|
||||
if got != want {
|
||||
t.Errorf("Want %f got %f\n", want, got)
|
||||
}
|
||||
|
||||
want = testMetrics[1][2].Value
|
||||
got = m.containers[key].current.store[21]
|
||||
if got != want {
|
||||
t.Errorf("Want %f got %f\n", want, got)
|
||||
}
|
||||
// printMemStore(m)
|
||||
}
|
||||
|
||||
func TestAddMetricReplace(t *testing.T) {
|
||||
key := "m1220"
|
||||
m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60)
|
||||
// printMemStore(m)
|
||||
|
||||
m.AddMetrics(key, 1584022800, testMetrics[0])
|
||||
m.AddMetrics(key, 1584022800, testMetrics[1])
|
||||
|
||||
want := testMetrics[1][0].Value
|
||||
got := m.containers[key].current.store[0]
|
||||
if got != want {
|
||||
t.Errorf("Want %f got %f\n", want, got)
|
||||
}
|
||||
|
||||
m.AddMetrics(key, 1584022850, testMetrics[0])
|
||||
want = testMetrics[0][0].Value
|
||||
got = m.containers[key].current.store[0]
|
||||
if got != want {
|
||||
t.Errorf("Want %f got %f\n", want, got)
|
||||
}
|
||||
|
||||
m.AddMetrics(key, 1584022860, testMetrics[1])
|
||||
want = testMetrics[0][0].Value
|
||||
got = m.containers[key].current.store[0]
|
||||
if got != want {
|
||||
t.Errorf("Want %f got %f\n", want, got)
|
||||
}
|
||||
// printMemStore(m)
|
||||
}
|
||||
|
||||
func TestAddMetricSwitch(t *testing.T) {
|
||||
key := "m1220"
|
||||
m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60)
|
||||
// printMemStore(m)
|
||||
|
||||
m.AddMetrics(key, 1584023000, testMetrics[0])
|
||||
m.AddMetrics(key, 1584023580, testMetrics[1])
|
||||
|
||||
want := testMetrics[1][2].Value
|
||||
got := m.containers[key].current.store[29]
|
||||
if got != want {
|
||||
t.Errorf("Want %f got %f\n", want, got)
|
||||
}
|
||||
|
||||
m.AddMetrics(key, 1584023600, testMetrics[2])
|
||||
want = testMetrics[2][2].Value
|
||||
got = m.containers[key].current.store[20]
|
||||
if got != want {
|
||||
t.Errorf("Want %f got %f\n", want, got)
|
||||
}
|
||||
|
||||
// printMemStore(m)
|
||||
}
|
||||
|
||||
//############################
|
||||
//#### Blackbox tests ########
|
||||
//############################
|
||||
|
||||
func TestAddMetricOutOfBounds(t *testing.T) {
|
||||
key := "m1220"
|
||||
m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 30, 60)
|
||||
|
||||
err := m.AddMetrics(key, 1584023000, testMetrics[0])
|
||||
if err != nil {
|
||||
t.Errorf("Got error 1584023000\n")
|
||||
}
|
||||
err = m.AddMetrics(key, 1584026600, testMetrics[0])
|
||||
if err == nil {
|
||||
t.Errorf("Got no error 1584026600\n")
|
||||
}
|
||||
err = m.AddMetrics(key, 1584021580, testMetrics[1])
|
||||
if err == nil {
|
||||
t.Errorf("Got no error 1584021580\n")
|
||||
}
|
||||
err = m.AddMetrics(key, 1584024580, testMetrics[1])
|
||||
if err != nil {
|
||||
t.Errorf("Got error 1584024580\n")
|
||||
}
|
||||
err = m.AddMetrics(key, 1584091580, testMetrics[1])
|
||||
if err == nil {
|
||||
t.Errorf("Got no error 1584091580\n")
|
||||
}
|
||||
err = m.AddMetrics(key, 1584024780, testMetrics[0])
|
||||
if err != nil {
|
||||
t.Errorf("Got error 1584024780\n")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetMetricPlainCurrent(t *testing.T) {
|
||||
key := "m1220"
|
||||
m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60)
|
||||
|
||||
for i := 0; i < len(testMetrics); i++ {
|
||||
m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i])
|
||||
}
|
||||
|
||||
// printMemStore(m)
|
||||
val, tsFrom, err := m.GetMetric(key, "flops", 1584023000, 1584023560)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Got error\n")
|
||||
}
|
||||
if tsFrom != 1584023000 {
|
||||
t.Errorf("Start ts differs: %d\n", tsFrom)
|
||||
}
|
||||
if len(val) != 9 {
|
||||
t.Errorf("Want 9. Got %d\n", len(val))
|
||||
}
|
||||
if val[0] != 100.5 {
|
||||
t.Errorf("Want 100.5 Got %f\n", val[0])
|
||||
}
|
||||
if val[8] != 970.5 {
|
||||
t.Errorf("Want 970.5 Got %f\n", val[9])
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetMetricPlainNext(t *testing.T) {
|
||||
key := "m1220"
|
||||
m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60)
|
||||
|
||||
for i := 0; i < len(testMetrics); i++ {
|
||||
m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i])
|
||||
}
|
||||
for i := 0; i < len(testMetrics); i++ {
|
||||
m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i])
|
||||
}
|
||||
|
||||
// printMemStore(m)
|
||||
val, tsFrom, err := m.GetMetric(key, "flops", 1584023000, 1584023560)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Got error\n")
|
||||
}
|
||||
if tsFrom != 1584023000 {
|
||||
t.Errorf("Start ts differs: %d\n", tsFrom)
|
||||
}
|
||||
if len(val) != 9 {
|
||||
t.Errorf("Want 9. Got %d\n", len(val))
|
||||
}
|
||||
if val[0] != 100.5 {
|
||||
t.Errorf("Want 100.5 Got %f\n", val[0])
|
||||
}
|
||||
if val[8] != 970.5 {
|
||||
t.Errorf("Want 970.5 Got %f\n", val[9])
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetMetricGap(t *testing.T) {
|
||||
key := "m1220"
|
||||
m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60)
|
||||
|
||||
for i := 0; i < len(testMetrics); i++ {
|
||||
m.AddMetrics(key, int64(1584023000+i*120), testMetrics[i])
|
||||
}
|
||||
|
||||
val, tsFrom, err := m.GetMetric(key, "flops", 1584023000, 1584023600)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Got error\n")
|
||||
}
|
||||
if tsFrom != 1584023000 {
|
||||
t.Errorf("Start ts differs: %d\n", tsFrom)
|
||||
}
|
||||
if len(val) != 10 {
|
||||
t.Errorf("Want 10. Got %d\n", len(val))
|
||||
}
|
||||
if val[0] != 100.5 {
|
||||
t.Errorf("Want 100.5 Got %f\n", val[0])
|
||||
}
|
||||
if !math.IsNaN(val[1]) {
|
||||
t.Errorf("Want NaN Got %f\n", val[1])
|
||||
}
|
||||
if val[0] != 100.5 {
|
||||
t.Errorf("Want 100.5 Got %f\n", val[0])
|
||||
}
|
||||
|
||||
// fmt.Println(val)
|
||||
}
|
||||
|
||||
func TestGetMetricSplit(t *testing.T) {
|
||||
key := "m1220"
|
||||
m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60)
|
||||
|
||||
for i := 0; i < len(testMetrics); i++ {
|
||||
m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i])
|
||||
}
|
||||
for i := 0; i < len(testMetrics); i++ {
|
||||
m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i])
|
||||
}
|
||||
|
||||
// printMemStore(m)
|
||||
|
||||
val, tsFrom, err := m.GetMetric(key, "flops", 1584023200, 1584023860)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Got error\n")
|
||||
}
|
||||
if tsFrom != 1584023200 {
|
||||
t.Errorf("Start ts differs: %d\n", tsFrom)
|
||||
}
|
||||
if len(val) != 11 {
|
||||
t.Errorf("Want 11. Got %d\n", len(val))
|
||||
}
|
||||
if val[0] != 940.5 {
|
||||
t.Errorf("Want 940.5 Got %f\n", val[0])
|
||||
}
|
||||
if val[10] != 950.5 {
|
||||
t.Errorf("Want 950.5 Got %f\n", val[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetMetricExceedNext(t *testing.T) {
|
||||
key := "m1220"
|
||||
m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60)
|
||||
|
||||
for i := 0; i < len(testMetrics); i++ {
|
||||
m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i])
|
||||
}
|
||||
for i := 0; i < len(testMetrics); i++ {
|
||||
m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i])
|
||||
}
|
||||
|
||||
// printMemStore(m)
|
||||
|
||||
val, tsFrom, err := m.GetMetric(key, "flops", 1584022800, 1584023400)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Got error\n")
|
||||
}
|
||||
if tsFrom != 1584023000 {
|
||||
t.Errorf("Start ts differs: %d\n", tsFrom)
|
||||
}
|
||||
if len(val) != 6 {
|
||||
t.Errorf("Want 6. Got %d\n", len(val))
|
||||
}
|
||||
if val[0] != 100.5 {
|
||||
t.Errorf("Want 100.5 Got %f\n", val[0])
|
||||
}
|
||||
if val[5] != 980.5 {
|
||||
t.Errorf("Want 980.5 Got %f\n", val[5])
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetMetricExceedNextSplit(t *testing.T) {
|
||||
key := "m1220"
|
||||
m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60)
|
||||
|
||||
for i := 0; i < len(testMetrics); i++ {
|
||||
m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i])
|
||||
}
|
||||
for i := 0; i < len(testMetrics); i++ {
|
||||
m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i])
|
||||
}
|
||||
|
||||
// printMemStore(m)
|
||||
|
||||
val, tsFrom, err := m.GetMetric(key, "flops", 1584022800, 1584023900)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Got error\n")
|
||||
}
|
||||
if tsFrom != 1584023000 {
|
||||
t.Errorf("Start ts differs: %d\n", tsFrom)
|
||||
}
|
||||
if len(val) != 15 {
|
||||
t.Errorf("Want 14. Got %d\n", len(val))
|
||||
}
|
||||
if val[0] != 100.5 {
|
||||
t.Errorf("Want 100.5 Got %f\n", val[0])
|
||||
}
|
||||
if val[14] != 960.5 {
|
||||
t.Errorf("Want 960.5 Got %f\n", val[13])
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetMetricExceedCurrent(t *testing.T) {
|
||||
key := "m1220"
|
||||
m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60)
|
||||
|
||||
for i := 0; i < len(testMetrics); i++ {
|
||||
m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i])
|
||||
}
|
||||
for i := 0; i < len(testMetrics); i++ {
|
||||
m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i])
|
||||
}
|
||||
|
||||
// printMemStore(m)
|
||||
|
||||
val, tsFrom, err := m.GetMetric(key, "flops", 1584023800, 1584027900)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Got error\n")
|
||||
}
|
||||
if tsFrom != 1584023800 {
|
||||
t.Errorf("Start ts differs: %d\n", tsFrom)
|
||||
}
|
||||
if len(val) != 7 {
|
||||
t.Errorf("Want 6. Got %d\n", len(val))
|
||||
}
|
||||
if val[0] != 950.5 {
|
||||
t.Errorf("Want 950.5 Got %f\n", val[0])
|
||||
}
|
||||
if val[6] != 930.5 {
|
||||
t.Errorf("Want 930.5 Got %f\n", val[5])
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetMetricExceedCurrentSplit(t *testing.T) {
|
||||
key := "m1220"
|
||||
m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60)
|
||||
|
||||
for i := 0; i < len(testMetrics); i++ {
|
||||
m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i])
|
||||
}
|
||||
for i := 0; i < len(testMetrics); i++ {
|
||||
m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i])
|
||||
}
|
||||
|
||||
// printMemStore(m)
|
||||
|
||||
val, tsFrom, err := m.GetMetric(key, "flops", 1584023120, 1584027900)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Got error\n")
|
||||
}
|
||||
if tsFrom != 1584023120 {
|
||||
t.Errorf("Start ts differs: %d\n", tsFrom)
|
||||
}
|
||||
if len(val) != 18 {
|
||||
t.Errorf("Want 18. Got %d\n", len(val))
|
||||
}
|
||||
if val[0] != 980.5 {
|
||||
t.Errorf("Want 950.5 Got %f\n", val[0])
|
||||
}
|
||||
if val[17] != 930.5 {
|
||||
t.Errorf("Want 930.5 Got %f\n", val[17])
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetMetricExceedBoth(t *testing.T) {
|
||||
key := "m1220"
|
||||
m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60)
|
||||
|
||||
for i := 0; i < len(testMetrics); i++ {
|
||||
m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i])
|
||||
}
|
||||
for i := 0; i < len(testMetrics); i++ {
|
||||
m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i])
|
||||
}
|
||||
|
||||
// printMemStore(m)
|
||||
|
||||
val, tsFrom, err := m.GetMetric(key, "flops", 1584022800, 1584027900)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Got error\n")
|
||||
}
|
||||
if tsFrom != 1584023000 {
|
||||
t.Errorf("Start ts differs: %d\n", tsFrom)
|
||||
}
|
||||
if len(val) != 20 {
|
||||
t.Errorf("Want 20. Got %d\n", len(val))
|
||||
}
|
||||
if val[0] != 100.5 {
|
||||
t.Errorf("Want 950.5 Got %f\n", val[0])
|
||||
}
|
||||
if val[19] != 930.5 {
|
||||
t.Errorf("Want 930.5 Got %f\n", val[17])
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetMetricOutUpper(t *testing.T) {
|
||||
key := "m1220"
|
||||
m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60)
|
||||
|
||||
for i := 0; i < len(testMetrics); i++ {
|
||||
m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i])
|
||||
}
|
||||
for i := 0; i < len(testMetrics); i++ {
|
||||
m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i])
|
||||
}
|
||||
|
||||
// printMemStore(m)
|
||||
|
||||
_, _, err := m.GetMetric(key, "flops", 1584032800, 1584037900)
|
||||
|
||||
if err == nil {
|
||||
t.Errorf("Got no error\n")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetMetricOutLower(t *testing.T) {
|
||||
key := "m1220"
|
||||
m := newMemoryStore([]string{"flops", "mem_bw", "mem_capacity"}, 10, 60)
|
||||
|
||||
for i := 0; i < len(testMetrics); i++ {
|
||||
m.AddMetrics(key, int64(1584023000+i*60), testMetrics[i])
|
||||
}
|
||||
for i := 0; i < len(testMetrics); i++ {
|
||||
m.AddMetrics(key, int64(1584023600+i*60), testMetricsAlt[i])
|
||||
}
|
||||
|
||||
// printMemStore(m)
|
||||
|
||||
_, _, err := m.GetMetric(key, "flops", 1584002800, 1584007900)
|
||||
|
||||
if err == nil {
|
||||
t.Errorf("Got no error\n")
|
||||
}
|
||||
}
|
89
metric-store.go
Normal file
89
metric-store.go
Normal file
@ -0,0 +1,89 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
nats "github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
MemoryStore struct {
|
||||
Duration string `json:"duration"`
|
||||
} `json:"memory_store"`
|
||||
FileStore struct {
|
||||
Duration string `json:"duration"`
|
||||
} `json:"file_store"`
|
||||
Root string `json:"root"`
|
||||
Frequency int `json:"frequency"`
|
||||
Metrics []string `json:"metrics"`
|
||||
}
|
||||
|
||||
type MetricData struct {
|
||||
Name string
|
||||
Values []float64
|
||||
}
|
||||
|
||||
type Metric struct {
|
||||
Name string
|
||||
Value float64
|
||||
}
|
||||
|
||||
type message struct {
|
||||
Ts int64
|
||||
Tags []string
|
||||
Fields []Metric
|
||||
}
|
||||
|
||||
var Conf Config
|
||||
|
||||
func loadConfiguration(file string) Config {
|
||||
var config Config
|
||||
configFile, err := os.Open(file)
|
||||
defer configFile.Close()
|
||||
if err != nil {
|
||||
fmt.Println(err.Error())
|
||||
}
|
||||
jsonParser := json.NewDecoder(configFile)
|
||||
jsonParser.Decode(&config)
|
||||
return config
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
||||
Conf = loadConfiguration("config.json")
|
||||
|
||||
// Connect to a server
|
||||
nc, err := nats.Connect(nats.DefaultURL)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
var msgBuffer bytes.Buffer
|
||||
dec := gob.NewDecoder(&msgBuffer)
|
||||
|
||||
// Use a WaitGroup to wait for a message to arrive
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
|
||||
// Subscribe
|
||||
if _, err := nc.Subscribe("updates", func(m *nats.Msg) {
|
||||
log.Println(m.Subject)
|
||||
var p message
|
||||
err = dec.Decode(&p)
|
||||
if err != nil {
|
||||
log.Fatal("decode error 1:", err)
|
||||
}
|
||||
}); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// Wait for a message to come in
|
||||
wg.Wait()
|
||||
}
|
Loading…
Reference in New Issue
Block a user