mirror of
https://github.com/ClusterCockpit/cc-metric-store.git
synced 2025-07-19 19:31:41 +02:00
Compare commits
8 Commits
v0.1.0
...
store_unit
Author | SHA1 | Date | |
---|---|---|---|
|
3f3a1501ff | ||
|
997dd8f2ee | ||
|
2d77dae30c | ||
|
5e5586f319 | ||
|
e71e1b123b | ||
|
7c891e1593 | ||
|
051cba4666 | ||
|
89acbe8db2 |
16
Makefile
16
Makefile
@@ -2,13 +2,13 @@
|
||||
APP = cc-metric-store
|
||||
GOSRC_APP := cc-metric-store.go
|
||||
GOSRC_FILES := api.go \
|
||||
memstore.go \
|
||||
archive.go \
|
||||
debug.go \
|
||||
float.go \
|
||||
lineprotocol.go \
|
||||
selector.go \
|
||||
stats.go
|
||||
memstore.go \
|
||||
archive.go \
|
||||
debug.go \
|
||||
float.go \
|
||||
lineprotocol.go \
|
||||
selector.go \
|
||||
stats.go
|
||||
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ BINDIR ?= bin
|
||||
.PHONY: all
|
||||
all: $(APP)
|
||||
|
||||
$(APP): $(GOSRC)
|
||||
$(APP): $(GOSRC) $(GOSRC_APP)
|
||||
go get
|
||||
go build -o $(APP) $(GOSRC_APP) $(GOSRC_FILES)
|
||||
|
||||
|
11
api.go
11
api.go
@@ -30,6 +30,7 @@ type ApiMetricData struct {
|
||||
Avg Float `json:"avg"`
|
||||
Min Float `json:"min"`
|
||||
Max Float `json:"max"`
|
||||
Unit string `json:"unit,omitempty"`
|
||||
}
|
||||
|
||||
// TODO: Optimize this, just like the stats endpoint!
|
||||
@@ -182,6 +183,7 @@ type ApiQueryRequest struct {
|
||||
From int64 `json:"from"`
|
||||
To int64 `json:"to"`
|
||||
WithStats bool `json:"with-stats"`
|
||||
WithUnit bool `json:"with-unit,omitempty"`
|
||||
WithData bool `json:"with-data"`
|
||||
WithPadding bool `json:"with-padding"`
|
||||
Queries []ApiQuery `json:"queries"`
|
||||
@@ -206,7 +208,7 @@ type ApiQuery struct {
|
||||
|
||||
func handleQuery(rw http.ResponseWriter, r *http.Request) {
|
||||
var err error
|
||||
var req ApiQueryRequest = ApiQueryRequest{WithStats: true, WithData: true, WithPadding: true}
|
||||
var req ApiQueryRequest = ApiQueryRequest{WithStats: true, WithData: true, WithPadding: true, WithUnit: true}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
@@ -281,7 +283,9 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) {
|
||||
res := make([]ApiMetricData, 0, len(sels))
|
||||
for _, sel := range sels {
|
||||
data := ApiMetricData{}
|
||||
data.Data, data.From, data.To, err = memoryStore.Read(sel, query.Metric, req.From, req.To)
|
||||
data.Unit = ""
|
||||
unit := ""
|
||||
data.Data, data.From, data.To, unit, err = memoryStore.Read(sel, query.Metric, req.From, req.To)
|
||||
// log.Printf("data: %#v, %#v, %#v, %#v", data.Data, data.From, data.To, err)
|
||||
if err != nil {
|
||||
msg := err.Error()
|
||||
@@ -299,6 +303,9 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) {
|
||||
if req.WithPadding {
|
||||
data.PadDataWithNull(req.From, req.To, query.Metric)
|
||||
}
|
||||
if req.WithUnit && len(unit) > 0 {
|
||||
data.Unit = unit
|
||||
}
|
||||
if !req.WithData {
|
||||
data.Data = nil
|
||||
}
|
||||
|
@@ -24,6 +24,7 @@ import (
|
||||
type CheckpointMetrics struct {
|
||||
Frequency int64 `json:"frequency"`
|
||||
Start int64 `json:"start"`
|
||||
Unit string `json:"unit,omitempty"`
|
||||
Data []Float `json:"data"`
|
||||
}
|
||||
|
||||
@@ -36,6 +37,7 @@ func (cm *CheckpointMetrics) MarshalJSON() ([]byte, error) {
|
||||
buf = strconv.AppendInt(buf, cm.Frequency, 10)
|
||||
buf = append(buf, `,"start":`...)
|
||||
buf = strconv.AppendInt(buf, cm.Start, 10)
|
||||
buf = append(buf, fmt.Sprintf(`,"unit":"%s"`, cm.Unit)...)
|
||||
buf = append(buf, `,"data":[`...)
|
||||
for i, x := range cm.Data {
|
||||
if i != 0 {
|
||||
@@ -179,6 +181,7 @@ func (l *level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFil
|
||||
Frequency: b.frequency,
|
||||
Start: start,
|
||||
Data: data,
|
||||
Unit: b.unit,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -309,6 +312,7 @@ func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
|
||||
frequency: metric.Frequency,
|
||||
start: metric.Start,
|
||||
data: metric.Data[0:n:n], // Space is wasted here :(
|
||||
unit: metric.Unit,
|
||||
prev: nil,
|
||||
next: nil,
|
||||
archived: true,
|
||||
|
54
archive_test.go
Normal file
54
archive_test.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestArchiveReadDir(t *testing.T) {
|
||||
memoryStore = NewMemoryStore(map[string]MetricConfig{
|
||||
"flops_dp": {Frequency: 5},
|
||||
"flops_sp": {Frequency: 5},
|
||||
})
|
||||
data := `{"from":1692628930,"to":1692629003,"metrics":{},"children":{"hwthread0":{"from":1692628930,"to":1692629003,"metrics":{"cpi":{"frequency":5,"start":1692628930,"unit":"","data":[2.2,0.7,2.1,1.6,2.3,0.7,0.9,1.8,0.7,1.7,1.5,1.8,0.9,1.6]},"flops_any":{"frequency":5,"start":1692628930,"unit":"MFLOP/s","data":[0.3,0.0,0.0,0.0,0.0,28.2,5.1,4.7,0.1,0.3,0.0,0.2,0.0,0.0]},"flops_dp":{"frequency":5,"start":1692628931,"unit":"MFLOP/s","data":[0.0,0.0,0.0,0.0,0.0,0.5,0.1,0.0,0.0,0.0,0.0,0.0,0.0,0.0]},"flops_sp":{"frequency":5,"start":1692628930,"unit":"MFLOP/s","data":[0.3,0.0,0.0,0.0,0.0,27.2,4.8,4.7,0.0,0.3,0.0,0.2,0.0,0.0]}},"children":{}},"hwthread1":{"from":1692628930,"to":1692629003,"metrics":{"cpi":{"frequency":5,"start":1692628930,"unit":"","data":[0.9,4.2,4.2,3.8,3.1,0.7,1.3,0.7,1.8,1.6,5.0,3.0,4.9,5.2]},"flops_any":{"frequency":5,"start":1692628930,"unit":"MFLOP/s","data":[0.5,0.0,0.0,0.0,0.0,49.8,5.0,0.1,0.0,0.0,0.0,0.0,0.0,0.0]},"flops_dp":{"frequency":5,"start":1692628931,"unit":"MFLOP/s","data":[0.0,0.0,0.0,0.0,0.0,0.4,0.1,0.0,0.0,0.0,0.0,0.0,0.0,0.0]},"flops_sp":{"frequency":5,"start":1692628930,"unit":"MFLOP/s","data":[0.5,0.0,0.0,0.0,0.0,49.1,4.9,0.1,0.0,0.0,0.0,0.0,0.0,0.0]}},"children":{}},"hwthread2":{"from":1692628930,"to":1692629003,"metrics":{"cpi":{"frequency":5,"start":1692628930,"unit":"","data":[1.8,0.9,2.2,2.1,4.3,1.4,0.8,1.0,1.1,1.6,3.9,4.9,2.7,5.1]},"flops_any":{"frequency":5,"start":1692628930,"unit":"MFLOP/s","data":[0.0,0.0,0.0,0.0,0.0,16.2,4.4,0.0,0.1,0.0,0.0,0.0,0.0,0.0]},"flops_dp":{"frequency":5,"start":1692628931,"unit":"MFLOP/s","data":[0.0,0.0,0.0,0.0,0.0,0.1,0.2,0.0,0.0,0.0,0.0,0.0,0.0,0.0]},"flops_sp":{"frequency":5,"start":1692628930,"unit":"MFLOP/s","data":[0.0,0.0,0.0,0.0,0.0,16.1,3.9,0.0,0.1,0.0,0.0,0.0,0.0,0.0]}},"children":{}},"hwthread3":{"from":1692628930,"to":1692629003,"metrics":{"cpi":{"frequency":5,"start":1692628930,"unit":"","data":[2.0,0.6,6.0,3.5,2.3,0.9,1.2,1.2,1.1,1.0,2.0,0.7,1.1,3.6]},"flops_any":{"frequency":5,"start":1692628930,"unit":"MFLOP/s","data":[0.0,0.0,0.0,0.0,0.0,1.6,11.5,0.0,0.0,0.0,0.0,0.2,0.0,0.0]},"flops_dp":{"frequency":5,"start":1692628931,"unit":"MFLOP/s","data":[0.0,0.0,0.0,0.0,0.0,0.1,0.4,0.0,0.0,0.0,0.0,0.1,0.0,0.0]},"flops_sp":{"frequency":5,"start":1692628930,"unit":"MFLOP/s","data":[0.0,0.0,0.0,0.0,0.0,1.4,10.6,0.0,0.0,0.0,0.0,0.0,0.0,0.0]}},"children":{}},"hwthread4":{"from":1692628930,"to":1692629003,"metrics":{"cpi":{"frequency":5,"start":1692628930,"unit":"","data":[1.0,1.5,1.3,0.9,0.8,0.8,1.2,1.2,1.4,1.0,0.8,2.3,2.7,0.8]},"flops_any":{"frequency":5,"start":1692628930,"unit":"MFLOP/s","data":[0.8,0.0,0.0,0.0,0.0,3.4,1.1,2.2,0.4,0.0,0.0,0.0,0.0,0.0]},"flops_dp":{"frequency":5,"start":1692628931,"unit":"MFLOP/s","data":[0.0,0.0,0.0,0.0,0.0,0.7,0.3,0.2,0.0,0.0,0.0,0.0,0.0,0.0]},"flops_sp":{"frequency":5,"start":1692628930,"unit":"MFLOP/s","data":[0.8,0.0,0.0,0.0,0.0,2.0,0.5,1.8,0.4,0.0,0.0,0.0,0.0,0.0]}},"children":{}},"hwthread5":{"from":1692628930,"to":1692629003,"metrics":{"cpi":{"frequency":5,"start":1692628930,"unit":"","data":[0.9,2.3,1.1,0.7,0.8,1.5,1.0,0.9,0.9,1.0,0.8,1.1,1.0,2.0]},"flops_any":{"frequency":5,"start":1692628930,"unit":"MFLOP/s","data":[0.5,0.0,1.9,0.1,0.3,3.0,13.8,0.0,1.7,1.1,0.0,0.0,0.5,1.0]},"flops_dp":{"frequency":5,"start":1692628931,"unit":"MFLOP/s","data":[0.3,0.0,0.1,0.0,0.2,0.1,0.9,0.0,0.8,0.6,0.0,0.0,0.0,0.0]},"flops_sp":{"frequency":5,"start":1692628930,"unit":"MFLOP/s","data":[0.0,0.0,1.7,0.0,0.0,2.9,12.0,0.0,0.0,0.0,0.0,0.0,0.5,1.0]}},"children":{}},"hwthread6":{"from":1692628930,"to":1692629003,"metrics":{"cpi":{"frequency":5,"start":1692628930,"unit":"","data":[1.4,0.8,0.8,1.2,1.4,0.8,1.2,1.1,1.5,0.9,1.5,1.5,1.4,0.8]},"flops_any":{"frequency":5,"start":1692628930,"unit":"MFLOP/s","data":[0.0,0.1,0.0,3.9,10.6,3.0,1.4,7.0,0.0,1.0,0.2,0.0,0.0,0.0]},"flops_dp":{"frequency":5,"start":1692628931,"unit":"MFLOP/s","data":[0.0,0.1,0.0,0.0,0.0,0.1,0.1,0.2,0.0,0.4,0.0,0.0,0.0,0.0]},"flops_sp":{"frequency":5,"start":1692628930,"unit":"MFLOP/s","data":[0.0,0.0,0.0,3.9,10.6,2.9,1.2,6.6,0.0,0.1,0.2,0.0,0.0,0.0]}},"children":{}},"hwthread7":{"from":1692628930,"to":1692629003,"metrics":{"cpi":{"frequency":5,"start":1692628930,"unit":"","data":[1.2,1.2,1.3,0.8,1.2,1.5,1.0,0.7,1.7,1.8,3.5,0.7,1.5,4.6]},"flops_any":{"frequency":5,"start":1692628930,"unit":"MFLOP/s","data":[0.0,0.0,0.0,0.3,0.0,2.3,4.7,0.0,12.5,0.7,0.2,0.2,0.0,0.0]},"flops_dp":{"frequency":5,"start":1692628931,"unit":"MFLOP/s","data":[0.0,0.0,0.0,0.0,0.0,0.1,0.9,0.0,0.0,0.0,0.0,0.0,0.0,0.0]},"flops_sp":{"frequency":5,"start":1692628930,"unit":"MFLOP/s","data":[0.0,0.0,0.0,0.3,0.0,2.2,2.8,0.0,12.5,0.7,0.2,0.2,0.0,0.0]}},"children":{}}}}`
|
||||
|
||||
tmpdir := t.TempDir()
|
||||
folder := filepath.Join(tmpdir, "testcluster", "nuc")
|
||||
filename := filepath.Join(folder, "1692628930.json")
|
||||
|
||||
err := os.MkdirAll(folder, 0755)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
f, err := os.Create(filename)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
f.WriteString(data)
|
||||
f.Close()
|
||||
|
||||
startupTime := time.Unix(1692628930, 0)
|
||||
_, err = memoryStore.FromCheckpoint(tmpdir, startupTime.Unix())
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
querydata := ApiMetricData{}
|
||||
querysel := Selector{
|
||||
{String: "testcluster"},
|
||||
{String: "nuc"},
|
||||
{String: "hwthread0"}}
|
||||
|
||||
querydata.Data, querydata.From, querydata.To, querydata.Unit, err = memoryStore.Read(querysel, "flops_dp", 1692628930, time.Now().Unix())
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if querydata.Unit != "MFLOP/s" {
|
||||
t.Errorf("Metric flops_dp does not have the right unit")
|
||||
return
|
||||
}
|
||||
}
|
3
debug.go
3
debug.go
@@ -21,6 +21,9 @@ func (b *buffer) debugDump(buf []byte) []byte {
|
||||
if b.archived {
|
||||
buf = append(buf, `,"saved":true`...)
|
||||
}
|
||||
if b.unit != "" {
|
||||
buf = append(buf, fmt.Sprintf(`,"unit":"%s"`, b.unit)...)
|
||||
}
|
||||
if b.next != nil {
|
||||
buf = append(buf, `},`...)
|
||||
} else {
|
||||
|
@@ -16,6 +16,7 @@ import (
|
||||
type Metric struct {
|
||||
Name string
|
||||
Value Float
|
||||
Unit string
|
||||
|
||||
mc MetricConfig
|
||||
}
|
||||
@@ -206,7 +207,7 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error {
|
||||
}
|
||||
|
||||
typeBuf, subTypeBuf := typeBuf[:0], subTypeBuf[:0]
|
||||
cluster, host := clusterDefault, ""
|
||||
cluster, host, unit := clusterDefault, "", ""
|
||||
for {
|
||||
key, val, err := dec.NextTag()
|
||||
if err != nil {
|
||||
@@ -232,6 +233,8 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error {
|
||||
host = string(val)
|
||||
lvl = nil
|
||||
}
|
||||
case "unit":
|
||||
unit = string(val)
|
||||
case "type":
|
||||
if string(val) == "node" {
|
||||
break
|
||||
@@ -300,6 +303,7 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error {
|
||||
} else {
|
||||
return fmt.Errorf("unsupported value type in message: %s", val.Kind().String())
|
||||
}
|
||||
metric.Unit = unit
|
||||
}
|
||||
|
||||
if t, err = dec.Time(lineprotocol.Second, t); err != nil {
|
||||
|
24
memstore.go
24
memstore.go
@@ -35,6 +35,7 @@ var (
|
||||
type buffer struct {
|
||||
frequency int64 // Time between two "slots"
|
||||
start int64 // Timestamp of when `data[0]` was written.
|
||||
unit string // Unit for the data in this buffer
|
||||
data []Float // The slice should never reallocacte as `cap(data)` is respected.
|
||||
prev, next *buffer // `prev` contains older data, `next` newer data.
|
||||
archived bool // If true, this buffer is already archived
|
||||
@@ -50,12 +51,13 @@ type buffer struct {
|
||||
*/
|
||||
}
|
||||
|
||||
func newBuffer(ts, freq int64) *buffer {
|
||||
func newBuffer(ts, freq int64, unit string) *buffer {
|
||||
b := bufferPool.Get().(*buffer)
|
||||
b.frequency = freq
|
||||
b.start = ts - (freq / 2)
|
||||
b.prev = nil
|
||||
b.next = nil
|
||||
b.unit = unit
|
||||
b.archived = false
|
||||
b.closed = false
|
||||
b.data = b.data[:0]
|
||||
@@ -74,7 +76,7 @@ func (b *buffer) write(ts int64, value Float) (*buffer, error) {
|
||||
// idx := int((ts - b.start + (b.frequency / 3)) / b.frequency)
|
||||
idx := int((ts - b.start) / b.frequency)
|
||||
if idx >= cap(b.data) {
|
||||
newbuf := newBuffer(ts, b.frequency)
|
||||
newbuf := newBuffer(ts, b.frequency, b.unit)
|
||||
newbuf.prev = b
|
||||
b.next = newbuf
|
||||
b.close()
|
||||
@@ -412,7 +414,7 @@ func (m *MemoryStore) WriteToLevel(l *level, selector []string, ts int64, metric
|
||||
b := l.metrics[metric.mc.offset]
|
||||
if b == nil {
|
||||
// First write to this metric and level
|
||||
b = newBuffer(ts, metric.mc.Frequency)
|
||||
b = newBuffer(ts, metric.mc.Frequency, metric.Unit)
|
||||
l.metrics[metric.mc.offset] = b
|
||||
}
|
||||
|
||||
@@ -433,14 +435,15 @@ func (m *MemoryStore) WriteToLevel(l *level, selector []string, ts int64, metric
|
||||
// If the level does not hold the metric itself, the data will be aggregated recursively from the children.
|
||||
// The second and third return value are the actual from/to for the data. Those can be different from
|
||||
// the range asked for if no data was available.
|
||||
func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([]Float, int64, int64, error) {
|
||||
func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([]Float, int64, int64, string, error) {
|
||||
var unit string = ""
|
||||
if from > to {
|
||||
return nil, 0, 0, errors.New("invalid time range")
|
||||
return nil, 0, 0, "", errors.New("invalid time range")
|
||||
}
|
||||
|
||||
minfo, ok := m.metrics[metric]
|
||||
if !ok {
|
||||
return nil, 0, 0, errors.New("unkown metric: " + metric)
|
||||
return nil, 0, 0, "", errors.New("unkown metric: " + metric)
|
||||
}
|
||||
|
||||
n, data := 0, make([]Float, (to-from)/minfo.Frequency+1)
|
||||
@@ -449,6 +452,7 @@ func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([]
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
unit = b.unit
|
||||
|
||||
if n == 0 {
|
||||
from, to = cfrom, cto
|
||||
@@ -476,9 +480,9 @@ func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([]
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, 0, 0, err
|
||||
return nil, 0, 0, "", err
|
||||
} else if n == 0 {
|
||||
return nil, 0, 0, errors.New("metric or host not found")
|
||||
return nil, 0, 0, "", errors.New("metric or host not found")
|
||||
} else if n > 1 {
|
||||
if minfo.Aggregation == AvgAggregation {
|
||||
normalize := 1. / Float(n)
|
||||
@@ -486,11 +490,11 @@ func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([]
|
||||
data[i] *= normalize
|
||||
}
|
||||
} else if minfo.Aggregation != SumAggregation {
|
||||
return nil, 0, 0, errors.New("invalid aggregation")
|
||||
return nil, 0, 0, "", errors.New("invalid aggregation")
|
||||
}
|
||||
}
|
||||
|
||||
return data, from, to, nil
|
||||
return data, from, to, unit, nil
|
||||
}
|
||||
|
||||
// Release all buffers for the selected level and all its children that contain only
|
||||
|
@@ -28,12 +28,12 @@ func TestMemoryStoreBasics(t *testing.T) {
|
||||
}
|
||||
|
||||
sel := Selector{{String: "testhost"}}
|
||||
adata, from, to, err := store.Read(sel, "a", start, start+count*frequency)
|
||||
if err != nil || from != start || to != start+count*frequency {
|
||||
adata, from, to, unit, err := store.Read(sel, "a", start, start+count*frequency)
|
||||
if err != nil || from != start || to != start+count*frequency || unit != "" {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
bdata, _, _, err := store.Read(sel, "b", start, start+count*frequency)
|
||||
bdata, _, _, unit, err := store.Read(sel, "b", start, start+count*frequency)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
@@ -83,23 +83,23 @@ func TestMemoryStoreTooMuchWrites(t *testing.T) {
|
||||
}
|
||||
|
||||
end := start + int64(count)*frequency
|
||||
data, from, to, err := store.Read(Selector{{String: "test"}}, "a", start, end)
|
||||
if len(data) != count || from != start || to != end || err != nil {
|
||||
data, from, to, unit, err := store.Read(Selector{{String: "test"}}, "a", start, end)
|
||||
if len(data) != count || from != start || to != end || err != nil || unit != "" {
|
||||
t.Fatalf("a: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data)
|
||||
}
|
||||
|
||||
data, from, to, err = store.Read(Selector{{String: "test"}}, "b", start, end)
|
||||
if len(data) != count/2 || from != start || to != end || err != nil {
|
||||
data, from, to, unit, err = store.Read(Selector{{String: "test"}}, "b", start, end)
|
||||
if len(data) != count/2 || from != start || to != end || err != nil || unit != "" {
|
||||
t.Fatalf("b: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data)
|
||||
}
|
||||
|
||||
data, from, to, err = store.Read(Selector{{String: "test"}}, "c", start, end)
|
||||
if len(data) != count*2-1 || from != start || to != end-frequency/2 || err != nil {
|
||||
data, from, to, unit, err = store.Read(Selector{{String: "test"}}, "c", start, end)
|
||||
if len(data) != count*2-1 || from != start || to != end-frequency/2 || err != nil || unit != "" {
|
||||
t.Fatalf("c: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data)
|
||||
}
|
||||
|
||||
data, from, to, err = store.Read(Selector{{String: "test"}}, "d", start, end)
|
||||
if len(data) != count/3+1 || from != start || to != end+frequency*2 || err != nil {
|
||||
data, from, to, unit, err = store.Read(Selector{{String: "test"}}, "d", start, end)
|
||||
if len(data) != count/3+1 || from != start || to != end+frequency*2 || err != nil || unit != "" {
|
||||
t.Errorf("expected: err=nil, from=%d, to=%d, len(data)=%d\n", start, end+frequency*2, count/3)
|
||||
t.Fatalf("d: err=%#v, from=%d, to=%d, data=%#v\n", err, from, to, data)
|
||||
}
|
||||
@@ -121,7 +121,7 @@ func TestMemoryStoreOutOfBounds(t *testing.T) {
|
||||
}
|
||||
|
||||
sel := Selector{{String: "cluster"}, {String: "host"}, {String: "cpu"}}
|
||||
data, from, to, err := store.Read(sel, "a", 500, int64(toffset+count*60+500))
|
||||
data, from, to, unit, err := store.Read(sel, "a", 500, int64(toffset+count*60+500))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -136,14 +136,14 @@ func TestMemoryStoreOutOfBounds(t *testing.T) {
|
||||
}
|
||||
|
||||
testfrom, testlen := int64(100000000), int64(10000)
|
||||
data, from, to, err = store.Read(sel, "a", testfrom, testfrom+testlen)
|
||||
if len(data) != 0 || from != testfrom || to != testfrom || err != nil {
|
||||
data, from, to, unit, err = store.Read(sel, "a", testfrom, testfrom+testlen)
|
||||
if len(data) != 0 || from != testfrom || to != testfrom || err != nil || unit != "" {
|
||||
t.Fatal("Unexpected data returned when reading range after valid data")
|
||||
}
|
||||
|
||||
testfrom, testlen = 0, 10
|
||||
data, from, to, err = store.Read(sel, "a", testfrom, testfrom+testlen)
|
||||
if len(data) != 0 || from/60 != int64(toffset)/60 || to/60 != int64(toffset)/60 || err != nil {
|
||||
data, from, to, unit, err = store.Read(sel, "a", testfrom, testfrom+testlen)
|
||||
if len(data) != 0 || from/60 != int64(toffset)/60 || to/60 != int64(toffset)/60 || err != nil || unit != "" {
|
||||
t.Fatal("Unexpected data returned when reading range before valid data")
|
||||
}
|
||||
}
|
||||
@@ -169,7 +169,7 @@ func TestMemoryStoreMissingDatapoints(t *testing.T) {
|
||||
}
|
||||
|
||||
sel := Selector{{String: "testhost"}}
|
||||
adata, _, _, err := store.Read(sel, "a", 0, int64(count))
|
||||
adata, _, _, _, err := store.Read(sel, "a", 0, int64(count))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
@@ -219,7 +219,7 @@ func TestMemoryStoreAggregation(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
adata, from, to, err := store.Read(Selector{{String: "host0"}}, "a", int64(0), int64(count))
|
||||
adata, from, to, _, err := store.Read(Selector{{String: "host0"}}, "a", int64(0), int64(count))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
@@ -352,7 +352,7 @@ func TestMemoryStoreArchive(t *testing.T) {
|
||||
}
|
||||
|
||||
sel := Selector{{String: "cluster"}, {String: "host"}, {String: "cpu0"}}
|
||||
adata, from, to, err := store2.Read(sel, "a", 100, int64(100+count))
|
||||
adata, from, to, _, err := store2.Read(sel, "a", 100, int64(100+count))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
@@ -398,7 +398,7 @@ func TestMemoryStoreFree(t *testing.T) {
|
||||
t.Fatal("two buffers expected to be released")
|
||||
}
|
||||
|
||||
adata, from, to, err := store.Read(Selector{{String: "cluster"}, {String: "host"}, {String: "1"}}, "a", 0, int64(count))
|
||||
adata, from, to, _, err := store.Read(Selector{{String: "cluster"}, {String: "host"}, {String: "1"}}, "a", 0, int64(count))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -451,7 +451,7 @@ func BenchmarkMemoryStoreConcurrentWrites(b *testing.B) {
|
||||
for g := 0; g < goroutines; g++ {
|
||||
host := fmt.Sprintf("host%d", g)
|
||||
sel := Selector{{String: "cluster"}, {String: host}, {String: "cpu0"}}
|
||||
adata, _, _, err := store.Read(sel, "a", 0, int64(count)*frequency)
|
||||
adata, _, _, _, err := store.Read(sel, "a", 0, int64(count)*frequency)
|
||||
if err != nil {
|
||||
b.Error(err)
|
||||
return
|
||||
@@ -500,7 +500,7 @@ func BenchmarkMemoryStoreAggregation(b *testing.B) {
|
||||
|
||||
b.StartTimer()
|
||||
for n := 0; n < b.N; n++ {
|
||||
data, from, to, err := store.Read(Selector{{String: "testcluster"}, {String: "host123"}}, "flops_any", 0, int64(count))
|
||||
data, from, to, _, err := store.Read(Selector{{String: "testcluster"}, {String: "host123"}}, "flops_any", 0, int64(count))
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
Reference in New Issue
Block a user