mirror of
https://github.com/ClusterCockpit/cc-metric-store.git
synced 2025-07-06 21:43:49 +02:00
Fix names and add missing methods
This commit is contained in:
parent
03a5dee97a
commit
d54bedc82f
@ -6,7 +6,6 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
@ -16,7 +15,12 @@ import (
|
|||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-metric-store/internal/api"
|
||||||
|
"github.com/ClusterCockpit/cc-metric-store/internal/api/apiv1"
|
||||||
|
"github.com/ClusterCockpit/cc-metric-store/internal/memstore"
|
||||||
|
"github.com/ClusterCockpit/cc-metric-store/internal/types"
|
||||||
"github.com/google/gops/agent"
|
"github.com/google/gops/agent"
|
||||||
|
"github.com/influxdata/line-protocol/v2/lineprotocol"
|
||||||
)
|
)
|
||||||
|
|
||||||
// For aggregation over multiple values at different cpus/sockets/..., not time!
|
// For aggregation over multiple values at different cpus/sockets/..., not time!
|
||||||
@ -47,17 +51,6 @@ func (as *AggregationStrategy) UnmarshalJSON(data []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type MetricConfig struct {
|
|
||||||
// Interval in seconds at which measurements will arive.
|
|
||||||
Frequency int64 `json:"frequency"`
|
|
||||||
|
|
||||||
// Can be 'sum', 'avg' or null. Describes how to aggregate metrics from the same timestep over the hierarchy.
|
|
||||||
Aggregation AggregationStrategy `json:"aggregation"`
|
|
||||||
|
|
||||||
// Private, used internally...
|
|
||||||
offset int
|
|
||||||
}
|
|
||||||
|
|
||||||
type HttpConfig struct {
|
type HttpConfig struct {
|
||||||
// Address to bind to, for example "0.0.0.0:8081"
|
// Address to bind to, for example "0.0.0.0:8081"
|
||||||
Address string `json:"address"`
|
Address string `json:"address"`
|
||||||
@ -69,27 +62,10 @@ type HttpConfig struct {
|
|||||||
KeyFile string `json:"https-key-file"`
|
KeyFile string `json:"https-key-file"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type NatsConfig struct {
|
|
||||||
// Address of the nats server
|
|
||||||
Address string `json:"address"`
|
|
||||||
|
|
||||||
// Username/Password, optional
|
|
||||||
Username string `json:"username"`
|
|
||||||
Password string `json:"password"`
|
|
||||||
|
|
||||||
Subscriptions []struct {
|
|
||||||
// Channel name
|
|
||||||
SubscribeTo string `json:"subscribe-to"`
|
|
||||||
|
|
||||||
// Allow lines without a cluster tag, use this as default, optional
|
|
||||||
ClusterTag string `json:"cluster-tag"`
|
|
||||||
} `json:"subscriptions"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Metrics map[string]MetricConfig `json:"metrics"`
|
Metrics map[string]types.MetricConfig `json:"metrics"`
|
||||||
RetentionInMemory string `json:"retention-in-memory"`
|
RetentionInMemory string `json:"retention-in-memory"`
|
||||||
Nats []*NatsConfig `json:"nats"`
|
Nats []*api.NatsConfig `json:"nats"`
|
||||||
JwtPublicKey string `json:"jwt-public-key"`
|
JwtPublicKey string `json:"jwt-public-key"`
|
||||||
HttpConfig *HttpConfig `json:"http-api"`
|
HttpConfig *HttpConfig `json:"http-api"`
|
||||||
Checkpoints struct {
|
Checkpoints struct {
|
||||||
@ -104,17 +80,13 @@ type Config struct {
|
|||||||
} `json:"archive"`
|
} `json:"archive"`
|
||||||
Debug struct {
|
Debug struct {
|
||||||
EnableGops bool `json:"gops"`
|
EnableGops bool `json:"gops"`
|
||||||
DumpToFile string `json:"dump-to-file"`
|
|
||||||
} `json:"debug"`
|
} `json:"debug"`
|
||||||
}
|
}
|
||||||
|
|
||||||
var conf Config
|
var conf Config
|
||||||
var memoryStore *MemoryStore = nil
|
var memoryStore *memstore.MemoryStore = nil
|
||||||
var lastCheckpoint time.Time
|
var lastCheckpoint time.Time
|
||||||
|
|
||||||
var debugDumpLock sync.Mutex
|
|
||||||
var debugDump io.Writer = io.Discard
|
|
||||||
|
|
||||||
func loadConfiguration(file string) Config {
|
func loadConfiguration(file string) Config {
|
||||||
var config Config
|
var config Config
|
||||||
configFile, err := os.Open(file)
|
configFile, err := os.Open(file)
|
||||||
@ -163,14 +135,10 @@ func intervals(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
case <-ticks:
|
case <-ticks:
|
||||||
t := time.Now().Add(-d)
|
t := time.Now().Add(-d)
|
||||||
log.Printf("start freeing buffers (older than %s)...\n", t.Format(time.RFC3339))
|
log.Printf("start freeing buffers (older than %s)...\n", t.Format(time.RFC3339))
|
||||||
freed, err := memoryStore.Free(nil, t.Unix())
|
freed := memoryStore.Free(t.Unix())
|
||||||
if err != nil {
|
|
||||||
log.Printf("freeing up buffers failed: %s\n", err.Error())
|
|
||||||
} else {
|
|
||||||
log.Printf("done: %d buffers freed\n", freed)
|
log.Printf("done: %d buffers freed\n", freed)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
lastCheckpoint = time.Now()
|
lastCheckpoint = time.Now()
|
||||||
@ -192,7 +160,7 @@ func intervals(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
case <-ticks:
|
case <-ticks:
|
||||||
log.Printf("start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339))
|
log.Printf("start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339))
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
n, err := memoryStore.ToCheckpoint(conf.Checkpoints.RootDir,
|
n, err := memoryStore.ToJSONCheckpoint(conf.Checkpoints.RootDir,
|
||||||
lastCheckpoint.Unix(), now.Unix())
|
lastCheckpoint.Unix(), now.Unix())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("checkpointing failed: %s\n", err.Error())
|
log.Printf("checkpointing failed: %s\n", err.Error())
|
||||||
@ -222,7 +190,7 @@ func intervals(wg *sync.WaitGroup, ctx context.Context) {
|
|||||||
case <-ticks:
|
case <-ticks:
|
||||||
t := time.Now().Add(-d)
|
t := time.Now().Add(-d)
|
||||||
log.Printf("start archiving checkpoints (older than %s)...\n", t.Format(time.RFC3339))
|
log.Printf("start archiving checkpoints (older than %s)...\n", t.Format(time.RFC3339))
|
||||||
n, err := ArchiveCheckpoints(conf.Checkpoints.RootDir, conf.Archive.RootDir, t.Unix(), conf.Archive.DeleteInstead)
|
n, err := memstore.ArchiveCheckpoints(conf.Checkpoints.RootDir, conf.Archive.RootDir, t.Unix(), conf.Archive.DeleteInstead)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("archiving failed: %s\n", err.Error())
|
log.Printf("archiving failed: %s\n", err.Error())
|
||||||
} else {
|
} else {
|
||||||
@ -242,7 +210,7 @@ func main() {
|
|||||||
|
|
||||||
startupTime := time.Now()
|
startupTime := time.Now()
|
||||||
conf = loadConfiguration(configFile)
|
conf = loadConfiguration(configFile)
|
||||||
memoryStore = NewMemoryStore(conf.Metrics)
|
memoryStore = memstore.NewMemoryStore(conf.Metrics)
|
||||||
|
|
||||||
if enableGopsAgent || conf.Debug.EnableGops {
|
if enableGopsAgent || conf.Debug.EnableGops {
|
||||||
if err := agent.Listen(agent.Options{}); err != nil {
|
if err := agent.Listen(agent.Options{}); err != nil {
|
||||||
@ -250,15 +218,6 @@ func main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if conf.Debug.DumpToFile != "" {
|
|
||||||
f, err := os.Create(conf.Debug.DumpToFile)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
debugDump = f
|
|
||||||
}
|
|
||||||
|
|
||||||
d, err := time.ParseDuration(conf.Checkpoints.Restore)
|
d, err := time.ParseDuration(conf.Checkpoints.Restore)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
@ -266,7 +225,7 @@ func main() {
|
|||||||
|
|
||||||
restoreFrom := startupTime.Add(-d)
|
restoreFrom := startupTime.Add(-d)
|
||||||
log.Printf("Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339))
|
log.Printf("Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339))
|
||||||
files, err := memoryStore.FromCheckpoint(conf.Checkpoints.RootDir, restoreFrom.Unix())
|
files, err := memoryStore.FromJSONCheckpoint(conf.Checkpoints.RootDir, restoreFrom.Unix())
|
||||||
loadedData := memoryStore.SizeInBytes() / 1024 / 1024 // In MB
|
loadedData := memoryStore.SizeInBytes() / 1024 / 1024 // In MB
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Loading checkpoints failed: %s\n", err.Error())
|
log.Fatalf("Loading checkpoints failed: %s\n", err.Error())
|
||||||
@ -307,8 +266,16 @@ func main() {
|
|||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
||||||
|
httpApi := apiv1.HttpApi{
|
||||||
|
MemoryStore: memoryStore,
|
||||||
|
PublicKey: conf.JwtPublicKey,
|
||||||
|
Address: conf.HttpConfig.Address,
|
||||||
|
CertFile: conf.HttpConfig.CertFile,
|
||||||
|
KeyFile: conf.HttpConfig.KeyFile,
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
err := StartApiServer(ctx, conf.HttpConfig)
|
err := httpApi.StartServer(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -322,7 +289,9 @@ func main() {
|
|||||||
nc := natsConf
|
nc := natsConf
|
||||||
go func() {
|
go func() {
|
||||||
// err := ReceiveNats(conf.Nats, decodeLine, runtime.NumCPU()-1, ctx)
|
// err := ReceiveNats(conf.Nats, decodeLine, runtime.NumCPU()-1, ctx)
|
||||||
err := ReceiveNats(nc, decodeLine, 1, ctx)
|
err := api.ReceiveNats(nc, func(d *lineprotocol.Decoder, s string) error {
|
||||||
|
return api.DecodeLine(memoryStore, d, s)
|
||||||
|
}, 1, ctx)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
@ -335,15 +304,9 @@ func main() {
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
log.Printf("Writing to '%s'...\n", conf.Checkpoints.RootDir)
|
log.Printf("Writing to '%s'...\n", conf.Checkpoints.RootDir)
|
||||||
files, err = memoryStore.ToCheckpoint(conf.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
|
files, err = memoryStore.ToJSONCheckpoint(conf.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Writing checkpoint failed: %s\n", err.Error())
|
log.Printf("Writing checkpoint failed: %s\n", err.Error())
|
||||||
}
|
}
|
||||||
log.Printf("Done! (%d files written)\n", files)
|
log.Printf("Done! (%d files written)\n", files)
|
||||||
|
|
||||||
if closer, ok := debugDump.(io.Closer); ok {
|
|
||||||
if err := closer.Close(); err != nil {
|
|
||||||
log.Printf("error: %s", err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -187,7 +187,7 @@ func reorder(buf, prefix []byte) []byte {
|
|||||||
|
|
||||||
// Decode lines using dec and make write calls to the MemoryStore.
|
// Decode lines using dec and make write calls to the MemoryStore.
|
||||||
// If a line is missing its cluster tag, use clusterDefault as default.
|
// If a line is missing its cluster tag, use clusterDefault as default.
|
||||||
func decodeLine(memoryStore *memstore.MemoryStore, dec *lineprotocol.Decoder, clusterDefault string) error {
|
func DecodeLine(memoryStore *memstore.MemoryStore, dec *lineprotocol.Decoder, clusterDefault string) error {
|
||||||
// Reduce allocations in loop:
|
// Reduce allocations in loop:
|
||||||
t := time.Now()
|
t := time.Now()
|
||||||
metric, metricBuf := types.Metric{}, make([]byte, 0, 16)
|
metric, metricBuf := types.Metric{}, make([]byte, 0, 16)
|
||||||
|
@ -139,7 +139,7 @@ func BenchmarkLineprotocolDecoder(b *testing.B) {
|
|||||||
dec := lineprotocol.NewDecoderWithBytes(data)
|
dec := lineprotocol.NewDecoderWithBytes(data)
|
||||||
|
|
||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
if err := decodeLine(memoryStore, dec, "ctest"); err != nil {
|
if err := DecodeLine(memoryStore, dec, "ctest"); err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
b.StopTimer()
|
b.StopTimer()
|
||||||
|
@ -124,7 +124,7 @@ func (c *chunk) read(from, to int64, data []types.Float) ([]types.Float, int64,
|
|||||||
return data[:i], from, t, nil
|
return data[:i], from, t, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *chunk) stats(from, to int64, data []types.Float) (types.Stats, int64, int64, error) {
|
func (c *chunk) stats(from, to int64) (types.Stats, int64, int64, error) {
|
||||||
stats := types.Stats{
|
stats := types.Stats{
|
||||||
Samples: 0,
|
Samples: 0,
|
||||||
Min: types.Float(math.MaxFloat64),
|
Min: types.Float(math.MaxFloat64),
|
||||||
@ -134,7 +134,7 @@ func (c *chunk) stats(from, to int64, data []types.Float) (types.Stats, int64, i
|
|||||||
|
|
||||||
if from < c.firstWrite() {
|
if from < c.firstWrite() {
|
||||||
if c.prev != nil {
|
if c.prev != nil {
|
||||||
return c.prev.stats(from, to, data)
|
return c.prev.stats(from, to)
|
||||||
}
|
}
|
||||||
from = c.firstWrite()
|
from = c.firstWrite()
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package main
|
package memstore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
@ -6,22 +6,22 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (b *buffer) debugDump(buf []byte) []byte {
|
func (c *chunk) debugDump(buf []byte) []byte {
|
||||||
if b.prev != nil {
|
if c.prev != nil {
|
||||||
buf = b.prev.debugDump(buf)
|
buf = c.prev.debugDump(buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
start, len, end := b.start, len(b.data), b.start+b.frequency*int64(len(b.data))
|
start, len, end := c.start, len(c.data), c.start+c.frequency*int64(len(c.data))
|
||||||
buf = append(buf, `{"start":`...)
|
buf = append(buf, `{"start":`...)
|
||||||
buf = strconv.AppendInt(buf, start, 10)
|
buf = strconv.AppendInt(buf, start, 10)
|
||||||
buf = append(buf, `,"len":`...)
|
buf = append(buf, `,"len":`...)
|
||||||
buf = strconv.AppendInt(buf, int64(len), 10)
|
buf = strconv.AppendInt(buf, int64(len), 10)
|
||||||
buf = append(buf, `,"end":`...)
|
buf = append(buf, `,"end":`...)
|
||||||
buf = strconv.AppendInt(buf, end, 10)
|
buf = strconv.AppendInt(buf, end, 10)
|
||||||
if b.archived {
|
if c.checkpointed {
|
||||||
buf = append(buf, `,"saved":true`...)
|
buf = append(buf, `,"saved":true`...)
|
||||||
}
|
}
|
||||||
if b.next != nil {
|
if c.next != nil {
|
||||||
buf = append(buf, `},`...)
|
buf = append(buf, `},`...)
|
||||||
} else {
|
} else {
|
||||||
buf = append(buf, `}`...)
|
buf = append(buf, `}`...)
|
||||||
@ -29,7 +29,7 @@ func (b *buffer) debugDump(buf []byte) []byte {
|
|||||||
return buf
|
return buf
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *level) debugDump(m *MemoryStore, w *bufio.Writer, lvlname string, buf []byte, depth int) ([]byte, error) {
|
func (l *Level) debugDump(m *MemoryStore, w *bufio.Writer, lvlname string, buf []byte, depth int) ([]byte, error) {
|
||||||
l.lock.RLock()
|
l.lock.RLock()
|
||||||
defer l.lock.RUnlock()
|
defer l.lock.RUnlock()
|
||||||
for i := 0; i < depth; i++ {
|
for i := 0; i < depth; i++ {
|
||||||
@ -41,7 +41,7 @@ func (l *level) debugDump(m *MemoryStore, w *bufio.Writer, lvlname string, buf [
|
|||||||
depth += 1
|
depth += 1
|
||||||
objitems := 0
|
objitems := 0
|
||||||
for name, mc := range m.metrics {
|
for name, mc := range m.metrics {
|
||||||
if b := l.metrics[mc.offset]; b != nil {
|
if b := l.metrics[mc.Offset]; b != nil {
|
||||||
for i := 0; i < depth; i++ {
|
for i := 0; i < depth; i++ {
|
||||||
buf = append(buf, '\t')
|
buf = append(buf, '\t')
|
||||||
}
|
}
|
||||||
@ -55,7 +55,7 @@ func (l *level) debugDump(m *MemoryStore, w *bufio.Writer, lvlname string, buf [
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for name, lvl := range l.children {
|
for name, lvl := range l.sublevels {
|
||||||
_, err := w.Write(buf)
|
_, err := w.Write(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"math"
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
|
"unsafe"
|
||||||
|
|
||||||
"github.com/ClusterCockpit/cc-metric-store/internal/types"
|
"github.com/ClusterCockpit/cc-metric-store/internal/types"
|
||||||
)
|
)
|
||||||
@ -139,6 +140,26 @@ func (l *Level) countValues(from int64, to int64) int {
|
|||||||
return vals
|
return vals
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *Level) sizeInBytes() int64 {
|
||||||
|
l.lock.RLock()
|
||||||
|
defer l.lock.RUnlock()
|
||||||
|
size := int64(0)
|
||||||
|
|
||||||
|
for _, b := range l.metrics {
|
||||||
|
for b != nil {
|
||||||
|
size += int64(len(b.data))
|
||||||
|
b = b.prev
|
||||||
|
}
|
||||||
|
}
|
||||||
|
size *= int64(unsafe.Sizeof(types.Float(0)))
|
||||||
|
|
||||||
|
for _, child := range l.sublevels {
|
||||||
|
size += child.sizeInBytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
return size
|
||||||
|
}
|
||||||
|
|
||||||
type MemoryStore struct {
|
type MemoryStore struct {
|
||||||
root Level // root of the tree structure
|
root Level // root of the tree structure
|
||||||
// TODO...
|
// TODO...
|
||||||
@ -388,3 +409,90 @@ func (m *MemoryStore) Read(selector types.Selector, metric string, from, to int6
|
|||||||
|
|
||||||
return data, from, to, nil
|
return data, from, to, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns statistics for the requested metric on the selected node/level.
|
||||||
|
// Data is aggregated to the selected level the same way as in `MemoryStore.Read`.
|
||||||
|
// If `Stats.Samples` is zero, the statistics should not be considered as valid.
|
||||||
|
func (m *MemoryStore) Stats(selector types.Selector, metric string, from, to int64) (*types.Stats, int64, int64, error) {
|
||||||
|
if from > to {
|
||||||
|
return nil, 0, 0, errors.New("invalid time range")
|
||||||
|
}
|
||||||
|
|
||||||
|
mc, ok := m.metrics[metric]
|
||||||
|
if !ok {
|
||||||
|
return nil, 0, 0, errors.New("unkown metric: " + metric)
|
||||||
|
}
|
||||||
|
|
||||||
|
n, samples := 0, 0
|
||||||
|
avg, min, max := types.Float(0), math.MaxFloat32, -math.MaxFloat32
|
||||||
|
err := m.root.findBuffers(selector, mc.Offset, func(c *chunk) error {
|
||||||
|
stats, cfrom, cto, err := c.stats(from, to)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if n == 0 {
|
||||||
|
from, to = cfrom, cto
|
||||||
|
} else if from != cfrom || to != cto {
|
||||||
|
return ErrDataDoesNotAlign
|
||||||
|
}
|
||||||
|
|
||||||
|
samples += stats.Samples
|
||||||
|
avg += stats.Avg
|
||||||
|
min = math.Min(min, float64(stats.Min))
|
||||||
|
max = math.Max(max, float64(stats.Max))
|
||||||
|
n += 1
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if n == 0 {
|
||||||
|
return nil, 0, 0, ErrNoData
|
||||||
|
}
|
||||||
|
|
||||||
|
if mc.Aggregation == types.AvgAggregation {
|
||||||
|
avg /= types.Float(n)
|
||||||
|
} else if n > 1 && mc.Aggregation != types.SumAggregation {
|
||||||
|
return nil, 0, 0, errors.New("invalid aggregation")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &types.Stats{
|
||||||
|
Samples: samples,
|
||||||
|
Avg: avg,
|
||||||
|
Min: types.Float(min),
|
||||||
|
Max: types.Float(max),
|
||||||
|
}, from, to, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Given a selector, return a list of all children of the level selected.
|
||||||
|
func (m *MemoryStore) ListChildren(selector []string) []string {
|
||||||
|
lvl := &m.root
|
||||||
|
for lvl != nil && len(selector) != 0 {
|
||||||
|
lvl.lock.RLock()
|
||||||
|
next := lvl.sublevels[selector[0]]
|
||||||
|
lvl.lock.RUnlock()
|
||||||
|
lvl = next
|
||||||
|
selector = selector[1:]
|
||||||
|
}
|
||||||
|
|
||||||
|
if lvl == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
lvl.lock.RLock()
|
||||||
|
defer lvl.lock.RUnlock()
|
||||||
|
|
||||||
|
children := make([]string, 0, len(lvl.sublevels))
|
||||||
|
for child := range lvl.sublevels {
|
||||||
|
children = append(children, child)
|
||||||
|
}
|
||||||
|
|
||||||
|
return children
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MemoryStore) SizeInBytes() int64 {
|
||||||
|
return m.root.sizeInBytes()
|
||||||
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package main
|
package memstore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -6,20 +6,22 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-metric-store/internal/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMemoryStoreBasics(t *testing.T) {
|
func TestMemoryStoreBasics(t *testing.T) {
|
||||||
frequency := int64(10)
|
frequency := int64(10)
|
||||||
start, count := int64(100), int64(5000)
|
start, count := int64(100), int64(5000)
|
||||||
store := NewMemoryStore(map[string]MetricConfig{
|
store := NewMemoryStore(map[string]types.MetricConfig{
|
||||||
"a": {Frequency: frequency},
|
"a": {Frequency: frequency},
|
||||||
"b": {Frequency: frequency * 2},
|
"b": {Frequency: frequency * 2},
|
||||||
})
|
})
|
||||||
|
|
||||||
for i := int64(0); i < count; i++ {
|
for i := int64(0); i < count; i++ {
|
||||||
err := store.Write([]string{"testhost"}, start+i*frequency, []Metric{
|
err := store.Write([]string{"testhost"}, start+i*frequency, []types.Metric{
|
||||||
{Name: "a", Value: Float(i)},
|
{Name: "a", Value: types.Float(i)},
|
||||||
{Name: "b", Value: Float(i / 2)},
|
{Name: "b", Value: types.Float(i / 2)},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
@ -27,7 +29,7 @@ func TestMemoryStoreBasics(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sel := Selector{{String: "testhost"}}
|
sel := types.Selector{{String: "testhost"}}
|
||||||
adata, from, to, err := store.Read(sel, "a", start, start+count*frequency)
|
adata, from, to, err := store.Read(sel, "a", start, start+count*frequency)
|
||||||
if err != nil || from != start || to != start+count*frequency {
|
if err != nil || from != start || to != start+count*frequency {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
@ -45,14 +47,14 @@ func TestMemoryStoreBasics(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < int(count); i++ {
|
for i := 0; i < int(count); i++ {
|
||||||
if adata[i] != Float(i) {
|
if adata[i] != types.Float(i) {
|
||||||
t.Errorf("incorrect value for metric a (%f vs. %f)", adata[i], Float(i))
|
t.Errorf("incorrect value for metric a (%f vs. %f)", adata[i], types.Float(i))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < int(count/2); i++ {
|
for i := 0; i < int(count/2); i++ {
|
||||||
if bdata[i] != Float(i) && bdata[i] != Float(i-1) {
|
if bdata[i] != types.Float(i) && bdata[i] != types.Float(i-1) {
|
||||||
t.Errorf("incorrect value for metric b (%f) at index %d", bdata[i], i)
|
t.Errorf("incorrect value for metric b (%f) at index %d", bdata[i], i)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -62,8 +64,8 @@ func TestMemoryStoreBasics(t *testing.T) {
|
|||||||
|
|
||||||
func TestMemoryStoreTooMuchWrites(t *testing.T) {
|
func TestMemoryStoreTooMuchWrites(t *testing.T) {
|
||||||
frequency := int64(10)
|
frequency := int64(10)
|
||||||
count := BUFFER_CAP*3 + 10
|
count := bufferSizeInFloats*3 + 10
|
||||||
store := NewMemoryStore(map[string]MetricConfig{
|
store := NewMemoryStore(map[string]types.MetricConfig{
|
||||||
"a": {Frequency: frequency},
|
"a": {Frequency: frequency},
|
||||||
"b": {Frequency: frequency * 2},
|
"b": {Frequency: frequency * 2},
|
||||||
"c": {Frequency: frequency / 2},
|
"c": {Frequency: frequency / 2},
|
||||||
@ -72,33 +74,33 @@ func TestMemoryStoreTooMuchWrites(t *testing.T) {
|
|||||||
|
|
||||||
start := int64(100)
|
start := int64(100)
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
if err := store.Write([]string{"test"}, start+int64(i)*frequency, []Metric{
|
if err := store.Write([]string{"test"}, start+int64(i)*frequency, []types.Metric{
|
||||||
{Name: "a", Value: Float(i)},
|
{Name: "a", Value: types.Float(i)},
|
||||||
{Name: "b", Value: Float(i / 2)},
|
{Name: "b", Value: types.Float(i / 2)},
|
||||||
{Name: "c", Value: Float(i * 2)},
|
{Name: "c", Value: types.Float(i * 2)},
|
||||||
{Name: "d", Value: Float(i / 3)},
|
{Name: "d", Value: types.Float(i / 3)},
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
end := start + int64(count)*frequency
|
end := start + int64(count)*frequency
|
||||||
data, from, to, err := store.Read(Selector{{String: "test"}}, "a", start, end)
|
data, from, to, err := store.Read(types.Selector{{String: "test"}}, "a", start, end)
|
||||||
if len(data) != count || from != start || to != end || err != nil {
|
if len(data) != count || from != start || to != end || err != nil {
|
||||||
t.Fatalf("a: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data)
|
t.Fatalf("a: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
data, from, to, err = store.Read(Selector{{String: "test"}}, "b", start, end)
|
data, from, to, err = store.Read(types.Selector{{String: "test"}}, "b", start, end)
|
||||||
if len(data) != count/2 || from != start || to != end || err != nil {
|
if len(data) != count/2 || from != start || to != end || err != nil {
|
||||||
t.Fatalf("b: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data)
|
t.Fatalf("b: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
data, from, to, err = store.Read(Selector{{String: "test"}}, "c", start, end)
|
data, from, to, err = store.Read(types.Selector{{String: "test"}}, "c", start, end)
|
||||||
if len(data) != count*2-1 || from != start || to != end-frequency/2 || err != nil {
|
if len(data) != count*2-1 || from != start || to != end-frequency/2 || err != nil {
|
||||||
t.Fatalf("c: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data)
|
t.Fatalf("c: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
data, from, to, err = store.Read(Selector{{String: "test"}}, "d", start, end)
|
data, from, to, err = store.Read(types.Selector{{String: "test"}}, "d", start, end)
|
||||||
if len(data) != count/3+1 || from != start || to != end+frequency*2 || err != nil {
|
if len(data) != count/3+1 || from != start || to != end+frequency*2 || err != nil {
|
||||||
t.Errorf("expected: err=nil, from=%d, to=%d, len(data)=%d\n", start, end+frequency*2, count/3)
|
t.Errorf("expected: err=nil, from=%d, to=%d, len(data)=%d\n", start, end+frequency*2, count/3)
|
||||||
t.Fatalf("d: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data)
|
t.Fatalf("d: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data)
|
||||||
@ -108,19 +110,19 @@ func TestMemoryStoreTooMuchWrites(t *testing.T) {
|
|||||||
func TestMemoryStoreOutOfBounds(t *testing.T) {
|
func TestMemoryStoreOutOfBounds(t *testing.T) {
|
||||||
count := 2000
|
count := 2000
|
||||||
toffset := 1000
|
toffset := 1000
|
||||||
store := NewMemoryStore(map[string]MetricConfig{
|
store := NewMemoryStore(map[string]types.MetricConfig{
|
||||||
"a": {Frequency: 60},
|
"a": {Frequency: 60},
|
||||||
})
|
})
|
||||||
|
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
if err := store.Write([]string{"cluster", "host", "cpu"}, int64(toffset+i*60), []Metric{
|
if err := store.Write([]string{"cluster", "host", "cpu"}, int64(toffset+i*60), []types.Metric{
|
||||||
{Name: "a", Value: Float(i)},
|
{Name: "a", Value: types.Float(i)},
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sel := Selector{{String: "cluster"}, {String: "host"}, {String: "cpu"}}
|
sel := types.Selector{{String: "cluster"}, {String: "host"}, {String: "cpu"}}
|
||||||
data, from, to, err := store.Read(sel, "a", 500, int64(toffset+count*60+500))
|
data, from, to, err := store.Read(sel, "a", 500, int64(toffset+count*60+500))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -130,9 +132,9 @@ func TestMemoryStoreOutOfBounds(t *testing.T) {
|
|||||||
t.Fatalf("Got %d-%d, expected %d-%d", from, to, toffset, toffset+count*60)
|
t.Fatalf("Got %d-%d, expected %d-%d", from, to, toffset, toffset+count*60)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(data) != count || data[0] != 0 || data[len(data)-1] != Float((count-1)) {
|
if len(data) != count || data[0] != 0 || data[len(data)-1] != types.Float((count-1)) {
|
||||||
t.Fatalf("Wrong data (got: %d, %f, %f, expected: %d, %f, %f)",
|
t.Fatalf("Wrong data (got: %d, %f, %f, expected: %d, %f, %f)",
|
||||||
len(data), data[0], data[len(data)-1], count, 0., Float(count-1))
|
len(data), data[0], data[len(data)-1], count, 0., types.Float(count-1))
|
||||||
}
|
}
|
||||||
|
|
||||||
testfrom, testlen := int64(100000000), int64(10000)
|
testfrom, testlen := int64(100000000), int64(10000)
|
||||||
@ -150,7 +152,7 @@ func TestMemoryStoreOutOfBounds(t *testing.T) {
|
|||||||
|
|
||||||
func TestMemoryStoreMissingDatapoints(t *testing.T) {
|
func TestMemoryStoreMissingDatapoints(t *testing.T) {
|
||||||
count := 3000
|
count := 3000
|
||||||
store := NewMemoryStore(map[string]MetricConfig{
|
store := NewMemoryStore(map[string]types.MetricConfig{
|
||||||
"a": {Frequency: 1},
|
"a": {Frequency: 1},
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -159,8 +161,8 @@ func TestMemoryStoreMissingDatapoints(t *testing.T) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err := store.Write([]string{"testhost"}, int64(i), []Metric{
|
err := store.Write([]string{"testhost"}, int64(i), []types.Metric{
|
||||||
{Name: "a", Value: Float(i)},
|
{Name: "a", Value: types.Float(i)},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
@ -168,7 +170,7 @@ func TestMemoryStoreMissingDatapoints(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sel := Selector{{String: "testhost"}}
|
sel := types.Selector{{String: "testhost"}}
|
||||||
adata, _, _, err := store.Read(sel, "a", 0, int64(count))
|
adata, _, _, err := store.Read(sel, "a", 0, int64(count))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
@ -182,7 +184,7 @@ func TestMemoryStoreMissingDatapoints(t *testing.T) {
|
|||||||
|
|
||||||
for i := 0; i < count-2; i++ {
|
for i := 0; i < count-2; i++ {
|
||||||
if i%3 == 0 {
|
if i%3 == 0 {
|
||||||
if adata[i] != Float(i) {
|
if adata[i] != types.Float(i) {
|
||||||
t.Error("unexpected value")
|
t.Error("unexpected value")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -197,21 +199,21 @@ func TestMemoryStoreMissingDatapoints(t *testing.T) {
|
|||||||
|
|
||||||
func TestMemoryStoreAggregation(t *testing.T) {
|
func TestMemoryStoreAggregation(t *testing.T) {
|
||||||
count := 3000
|
count := 3000
|
||||||
store := NewMemoryStore(map[string]MetricConfig{
|
store := NewMemoryStore(map[string]types.MetricConfig{
|
||||||
"a": {Frequency: 1, Aggregation: SumAggregation},
|
"a": {Frequency: 1, Aggregation: types.SumAggregation},
|
||||||
})
|
})
|
||||||
|
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
err := store.Write([]string{"host0", "cpu0"}, int64(i), []Metric{
|
err := store.Write([]string{"host0", "cpu0"}, int64(i), []types.Metric{
|
||||||
{Name: "a", Value: Float(i) / 2.},
|
{Name: "a", Value: types.Float(i) / 2.},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = store.Write([]string{"host0", "cpu1"}, int64(i), []Metric{
|
err = store.Write([]string{"host0", "cpu1"}, int64(i), []types.Metric{
|
||||||
{Name: "a", Value: Float(i) * 2.},
|
{Name: "a", Value: types.Float(i) * 2.},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
@ -219,7 +221,7 @@ func TestMemoryStoreAggregation(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
adata, from, to, err := store.Read(Selector{{String: "host0"}}, "a", int64(0), int64(count))
|
adata, from, to, err := store.Read(types.Selector{{String: "host0"}}, "a", int64(0), int64(count))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
@ -231,7 +233,7 @@ func TestMemoryStoreAggregation(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
expected := Float(i)/2. + Float(i)*2.
|
expected := types.Float(i)/2. + types.Float(i)*2.
|
||||||
if adata[i] != expected {
|
if adata[i] != expected {
|
||||||
t.Errorf("expected: %f, got: %f", expected, adata[i])
|
t.Errorf("expected: %f, got: %f", expected, adata[i])
|
||||||
return
|
return
|
||||||
@ -241,9 +243,9 @@ func TestMemoryStoreAggregation(t *testing.T) {
|
|||||||
|
|
||||||
func TestMemoryStoreStats(t *testing.T) {
|
func TestMemoryStoreStats(t *testing.T) {
|
||||||
count := 3000
|
count := 3000
|
||||||
store := NewMemoryStore(map[string]MetricConfig{
|
store := NewMemoryStore(map[string]types.MetricConfig{
|
||||||
"a": {Frequency: 1},
|
"a": {Frequency: 1},
|
||||||
"b": {Frequency: 1, Aggregation: AvgAggregation},
|
"b": {Frequency: 1, Aggregation: types.AvgAggregation},
|
||||||
})
|
})
|
||||||
|
|
||||||
sel1 := []string{"cluster", "host1"}
|
sel1 := []string{"cluster", "host1"}
|
||||||
@ -270,18 +272,18 @@ func TestMemoryStoreStats(t *testing.T) {
|
|||||||
bmin = math.Min(bmin, b)
|
bmin = math.Min(bmin, b)
|
||||||
bmax = math.Max(bmax, b)
|
bmax = math.Max(bmax, b)
|
||||||
|
|
||||||
store.Write(sel1, int64(i), []Metric{
|
store.Write(sel1, int64(i), []types.Metric{
|
||||||
{Name: "a", Value: Float(a)},
|
{Name: "a", Value: types.Float(a)},
|
||||||
})
|
})
|
||||||
store.Write(sel2, int64(i), []Metric{
|
store.Write(sel2, int64(i), []types.Metric{
|
||||||
{Name: "b", Value: Float(b)},
|
{Name: "b", Value: types.Float(b)},
|
||||||
})
|
})
|
||||||
store.Write(sel3, int64(i), []Metric{
|
store.Write(sel3, int64(i), []types.Metric{
|
||||||
{Name: "b", Value: Float(b)},
|
{Name: "b", Value: types.Float(b)},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
stats, from, to, err := store.Stats(Selector{{String: "cluster"}, {String: "host1"}}, "a", 0, int64(count))
|
stats, from, to, err := store.Stats(types.Selector{{String: "cluster"}, {String: "host1"}}, "a", 0, int64(count))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -290,11 +292,11 @@ func TestMemoryStoreStats(t *testing.T) {
|
|||||||
t.Fatalf("unexpected: from=%d, to=%d, stats.Samples=%d (expected samples=%d)\n", from, to, stats.Samples, samples)
|
t.Fatalf("unexpected: from=%d, to=%d, stats.Samples=%d (expected samples=%d)\n", from, to, stats.Samples, samples)
|
||||||
}
|
}
|
||||||
|
|
||||||
if stats.Avg != Float(asum/float64(samples)) || stats.Min != Float(amin) || stats.Max != Float(amax) {
|
if stats.Avg != types.Float(asum/float64(samples)) || stats.Min != types.Float(amin) || stats.Max != types.Float(amax) {
|
||||||
t.Fatalf("wrong stats: %#v\n", stats)
|
t.Fatalf("wrong stats: %#v\n", stats)
|
||||||
}
|
}
|
||||||
|
|
||||||
stats, from, to, err = store.Stats(Selector{{String: "cluster"}, {String: "host2"}}, "b", 0, int64(count))
|
stats, from, to, err = store.Stats(types.Selector{{String: "cluster"}, {String: "host2"}}, "b", 0, int64(count))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -303,22 +305,22 @@ func TestMemoryStoreStats(t *testing.T) {
|
|||||||
t.Fatalf("unexpected: from=%d, to=%d, stats.Samples=%d (expected samples=%d)\n", from, to, stats.Samples, samples*2)
|
t.Fatalf("unexpected: from=%d, to=%d, stats.Samples=%d (expected samples=%d)\n", from, to, stats.Samples, samples*2)
|
||||||
}
|
}
|
||||||
|
|
||||||
if stats.Avg != Float(bsum/float64(samples*2)) || stats.Min != Float(bmin) || stats.Max != Float(bmax) {
|
if stats.Avg != types.Float(bsum/float64(samples*2)) || stats.Min != types.Float(bmin) || stats.Max != types.Float(bmax) {
|
||||||
t.Fatalf("wrong stats: %#v (expected: avg=%f, min=%f, max=%f)\n", stats, bsum/float64(samples*2), bmin, bmax)
|
t.Fatalf("wrong stats: %#v (expected: avg=%f, min=%f, max=%f)\n", stats, bsum/float64(samples*2), bmin, bmax)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMemoryStoreArchive(t *testing.T) {
|
func TestMemoryStoreArchive(t *testing.T) {
|
||||||
store1 := NewMemoryStore(map[string]MetricConfig{
|
store1 := NewMemoryStore(map[string]types.MetricConfig{
|
||||||
"a": {Frequency: 1},
|
"a": {Frequency: 1},
|
||||||
"b": {Frequency: 1},
|
"b": {Frequency: 1},
|
||||||
})
|
})
|
||||||
|
|
||||||
count := 2000
|
count := 2000
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
err := store1.Write([]string{"cluster", "host", "cpu0"}, 100+int64(i), []Metric{
|
err := store1.Write([]string{"cluster", "host", "cpu0"}, 100+int64(i), []types.Metric{
|
||||||
{Name: "a", Value: Float(i)},
|
{Name: "a", Value: types.Float(i)},
|
||||||
{Name: "b", Value: Float(i * 2)},
|
{Name: "b", Value: types.Float(i * 2)},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
@ -329,29 +331,29 @@ func TestMemoryStoreArchive(t *testing.T) {
|
|||||||
// store1.DebugDump(bufio.NewWriter(os.Stdout))
|
// store1.DebugDump(bufio.NewWriter(os.Stdout))
|
||||||
|
|
||||||
archiveRoot := t.TempDir()
|
archiveRoot := t.TempDir()
|
||||||
_, err := store1.ToCheckpoint(archiveRoot, 100, 100+int64(count/2))
|
_, err := store1.ToJSONCheckpoint(archiveRoot, 100, 100+int64(count/2))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = store1.ToCheckpoint(archiveRoot, 100+int64(count/2), 100+int64(count))
|
_, err = store1.ToJSONCheckpoint(archiveRoot, 100+int64(count/2), 100+int64(count))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
store2 := NewMemoryStore(map[string]MetricConfig{
|
store2 := NewMemoryStore(map[string]types.MetricConfig{
|
||||||
"a": {Frequency: 1},
|
"a": {Frequency: 1},
|
||||||
"b": {Frequency: 1},
|
"b": {Frequency: 1},
|
||||||
})
|
})
|
||||||
n, err := store2.FromCheckpoint(archiveRoot, 100)
|
n, err := store2.FromJSONCheckpoint(archiveRoot, 100)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
sel := Selector{{String: "cluster"}, {String: "host"}, {String: "cpu0"}}
|
sel := types.Selector{{String: "cluster"}, {String: "host"}, {String: "cpu0"}}
|
||||||
adata, from, to, err := store2.Read(sel, "a", 100, int64(100+count))
|
adata, from, to, err := store2.Read(sel, "a", 100, int64(100+count))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
@ -364,7 +366,7 @@ func TestMemoryStoreArchive(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
expected := Float(i)
|
expected := types.Float(i)
|
||||||
if adata[i] != expected {
|
if adata[i] != expected {
|
||||||
t.Errorf("expected: %f, got: %f", expected, adata[i])
|
t.Errorf("expected: %f, got: %f", expected, adata[i])
|
||||||
}
|
}
|
||||||
@ -372,7 +374,7 @@ func TestMemoryStoreArchive(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestMemoryStoreFree(t *testing.T) {
|
func TestMemoryStoreFree(t *testing.T) {
|
||||||
store := NewMemoryStore(map[string]MetricConfig{
|
store := NewMemoryStore(map[string]types.MetricConfig{
|
||||||
"a": {Frequency: 1},
|
"a": {Frequency: 1},
|
||||||
"b": {Frequency: 2},
|
"b": {Frequency: 2},
|
||||||
})
|
})
|
||||||
@ -380,44 +382,41 @@ func TestMemoryStoreFree(t *testing.T) {
|
|||||||
count := 3000
|
count := 3000
|
||||||
sel := []string{"cluster", "host", "1"}
|
sel := []string{"cluster", "host", "1"}
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
err := store.Write(sel, int64(i), []Metric{
|
err := store.Write(sel, int64(i), []types.Metric{
|
||||||
{Name: "a", Value: Float(i)},
|
{Name: "a", Value: types.Float(i)},
|
||||||
{Name: "b", Value: Float(i)},
|
{Name: "b", Value: types.Float(i)},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err := store.Free([]string{"cluster", "host"}, int64(BUFFER_CAP*2)+100)
|
n := store.Free(int64(bufferSizeInFloats*2) + 100)
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if n != 3 {
|
if n != 3 {
|
||||||
t.Fatal("two buffers expected to be released")
|
t.Fatal("two buffers expected to be released")
|
||||||
}
|
}
|
||||||
|
|
||||||
adata, from, to, err := store.Read(Selector{{String: "cluster"}, {String: "host"}, {String: "1"}}, "a", 0, int64(count))
|
adata, from, to, err := store.Read(types.Selector{{String: "cluster"}, {String: "host"}, {String: "1"}}, "a", 0, int64(count))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if from != int64(BUFFER_CAP*2) || to != int64(count) || len(adata) != count-2*BUFFER_CAP {
|
if from != int64(bufferSizeInFloats*2) || to != int64(count) || len(adata) != count-2*bufferSizeInFloats {
|
||||||
t.Fatalf("unexpected values from call to `Read`: from=%d, to=%d, len=%d", from, to, len(adata))
|
t.Fatalf("unexpected values from call to `Read`: from=%d, to=%d, len=%d", from, to, len(adata))
|
||||||
}
|
}
|
||||||
|
|
||||||
// bdata, from, to, err := store.Read(Selector{{String: "cluster"}, {String: "host"}, {String: "1"}}, "b", 0, int64(count))
|
// bdata, from, to, err := store.Read(types.Selector{{String: "cluster"}, {String: "host"}, {String: "1"}}, "b", 0, int64(count))
|
||||||
// if err != nil {
|
// if err != nil {
|
||||||
// t.Fatal(err)
|
// t.Fatal(err)
|
||||||
// }
|
// }
|
||||||
|
|
||||||
// if from != int64(BUFFER_CAP*2) || to != int64(count) || len(bdata) != (count-2*BUFFER_CAP)/2 {
|
// if from != int64(bufferSizeInFloats*2) || to != int64(count) || len(bdata) != (count-2*bufferSizeInFloats)/2 {
|
||||||
// t.Fatalf("unexpected values from call to `Read`: from=%d (expected: %d), to=%d (expected: %d), len=%d (expected: %d)",
|
// t.Fatalf("unexpected values from call to `Read`: from=%d (expected: %d), to=%d (expected: %d), len=%d (expected: %d)",
|
||||||
// from, BUFFER_CAP*2, to, count, len(bdata), (count-2*BUFFER_CAP)/2)
|
// from, bufferSizeInFloats*2, to, count, len(bdata), (count-2*bufferSizeInFloats)/2)
|
||||||
// }
|
// }
|
||||||
|
|
||||||
if adata[0] != Float(BUFFER_CAP*2) || adata[len(adata)-1] != Float(count-1) {
|
if adata[0] != types.Float(bufferSizeInFloats*2) || adata[len(adata)-1] != types.Float(count-1) {
|
||||||
t.Fatal("wrong values")
|
t.Fatal("wrong values")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -426,7 +425,7 @@ func BenchmarkMemoryStoreConcurrentWrites(b *testing.B) {
|
|||||||
frequency := int64(5)
|
frequency := int64(5)
|
||||||
count := b.N
|
count := b.N
|
||||||
goroutines := 4
|
goroutines := 4
|
||||||
store := NewMemoryStore(map[string]MetricConfig{
|
store := NewMemoryStore(map[string]types.MetricConfig{
|
||||||
"a": {Frequency: frequency},
|
"a": {Frequency: frequency},
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -437,8 +436,8 @@ func BenchmarkMemoryStoreConcurrentWrites(b *testing.B) {
|
|||||||
go func(g int) {
|
go func(g int) {
|
||||||
host := fmt.Sprintf("host%d", g)
|
host := fmt.Sprintf("host%d", g)
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
store.Write([]string{"cluster", host, "cpu0"}, int64(i)*frequency, []Metric{
|
store.Write([]string{"cluster", host, "cpu0"}, int64(i)*frequency, []types.Metric{
|
||||||
{Name: "a", Value: Float(i)},
|
{Name: "a", Value: types.Float(i)},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
@ -450,7 +449,7 @@ func BenchmarkMemoryStoreConcurrentWrites(b *testing.B) {
|
|||||||
|
|
||||||
for g := 0; g < goroutines; g++ {
|
for g := 0; g < goroutines; g++ {
|
||||||
host := fmt.Sprintf("host%d", g)
|
host := fmt.Sprintf("host%d", g)
|
||||||
sel := Selector{{String: "cluster"}, {String: host}, {String: "cpu0"}}
|
sel := types.Selector{{String: "cluster"}, {String: host}, {String: "cpu0"}}
|
||||||
adata, _, _, err := store.Read(sel, "a", 0, int64(count)*frequency)
|
adata, _, _, err := store.Read(sel, "a", 0, int64(count)*frequency)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Error(err)
|
b.Error(err)
|
||||||
@ -463,7 +462,7 @@ func BenchmarkMemoryStoreConcurrentWrites(b *testing.B) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
expected := Float(i)
|
expected := types.Float(i)
|
||||||
if adata[i] != expected {
|
if adata[i] != expected {
|
||||||
b.Error("incorrect value for metric a")
|
b.Error("incorrect value for metric a")
|
||||||
return
|
return
|
||||||
@ -475,23 +474,23 @@ func BenchmarkMemoryStoreConcurrentWrites(b *testing.B) {
|
|||||||
func BenchmarkMemoryStoreAggregation(b *testing.B) {
|
func BenchmarkMemoryStoreAggregation(b *testing.B) {
|
||||||
b.StopTimer()
|
b.StopTimer()
|
||||||
count := 2000
|
count := 2000
|
||||||
store := NewMemoryStore(map[string]MetricConfig{
|
store := NewMemoryStore(map[string]types.MetricConfig{
|
||||||
"flops_any": {Frequency: 1, Aggregation: AvgAggregation},
|
"flops_any": {Frequency: 1, Aggregation: types.AvgAggregation},
|
||||||
})
|
})
|
||||||
|
|
||||||
sel := []string{"testcluster", "host123", "cpu0"}
|
sel := []string{"testcluster", "host123", "cpu0"}
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
sel[2] = "cpu0"
|
sel[2] = "cpu0"
|
||||||
err := store.Write(sel, int64(i), []Metric{
|
err := store.Write(sel, int64(i), []types.Metric{
|
||||||
{Name: "flops_any", Value: Float(i)},
|
{Name: "flops_any", Value: types.Float(i)},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
sel[2] = "cpu1"
|
sel[2] = "cpu1"
|
||||||
err = store.Write(sel, int64(i), []Metric{
|
err = store.Write(sel, int64(i), []types.Metric{
|
||||||
{Name: "flops_any", Value: Float(i)},
|
{Name: "flops_any", Value: types.Float(i)},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
@ -500,7 +499,7 @@ func BenchmarkMemoryStoreAggregation(b *testing.B) {
|
|||||||
|
|
||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
data, from, to, err := store.Read(Selector{{String: "testcluster"}, {String: "host123"}}, "flops_any", 0, int64(count))
|
data, from, to, err := store.Read(types.Selector{{String: "testcluster"}, {String: "host123"}}, "flops_any", 0, int64(count))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user