diff --git a/api.go b/api.go index 66fa0ff..3fca3e5 100644 --- a/api.go +++ b/api.go @@ -62,8 +62,8 @@ func (data *ApiMetricData) PadDataWithNull(from, to int64, metric string) { return } - if (data.From / minfo.frequency) > (from / minfo.frequency) { - padfront := int((data.From / minfo.frequency) - (from / minfo.frequency)) + if (data.From / minfo.Frequency) > (from / minfo.Frequency) { + padfront := int((data.From / minfo.Frequency) - (from / minfo.Frequency)) ndata := make([]Float, 0, padfront+len(data.Data)) for i := 0; i < padfront; i++ { ndata = append(ndata, NaN) @@ -101,7 +101,7 @@ func handleFree(rw http.ResponseWriter, r *http.Request) { } bodyDec := json.NewDecoder(r.Body) - var selectors []Selector + var selectors [][]string err = bodyDec.Decode(&selectors) if err != nil { http.Error(rw, err.Error(), http.StatusBadRequest) @@ -137,6 +137,7 @@ func handleWrite(rw http.ResponseWriter, r *http.Request) { dec := lineprotocol.NewDecoderWithBytes(bytes) if err := decodeLine(dec, r.URL.Query().Get("cluster")); err != nil { + log.Printf("/api/write error: %s", err.Error()) http.Error(rw, err.Error(), http.StatusBadRequest) return } diff --git a/cc-metric-store.go b/cc-metric-store.go index 25a3b21..f77457d 100644 --- a/cc-metric-store.go +++ b/cc-metric-store.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "flag" + "fmt" "log" "os" "os/signal" @@ -15,12 +16,43 @@ import ( "github.com/google/gops/agent" ) +// For aggregation over multiple values at different cpus/sockets/..., not time! +type AggregationStrategy int + +const ( + NoAggregation AggregationStrategy = iota + SumAggregation + AvgAggregation +) + +func (as *AggregationStrategy) UnmarshalJSON(data []byte) error { + var str string + if err := json.Unmarshal(data, &str); err != nil { + return err + } + + switch str { + case "": + *as = NoAggregation + case "sum": + *as = SumAggregation + case "avg": + *as = AvgAggregation + default: + return fmt.Errorf("invalid aggregation strategy: %#v", str) + } + return nil +} + type MetricConfig struct { // Interval in seconds at which measurements will arive. Frequency int64 `json:"frequency"` // Can be 'sum', 'avg' or null. Describes how to aggregate metrics from the same timestep over the hierarchy. - Aggregation string `json:"aggregation"` + Aggregation AggregationStrategy `json:"aggregation"` + + // Private, used internally... + offset int } type HttpConfig struct { @@ -107,7 +139,7 @@ func intervals(wg *sync.WaitGroup, ctx context.Context) { case <-ticks: t := time.Now().Add(-d) log.Printf("start freeing buffers (older than %s)...\n", t.Format(time.RFC3339)) - freed, err := memoryStore.Free(Selector{}, t.Unix()) + freed, err := memoryStore.Free(nil, t.Unix()) if err != nil { log.Printf("freeing up buffers failed: %s\n", err.Error()) } else { diff --git a/lineprotocol.go b/lineprotocol.go index 33562d8..f9d0b1b 100644 --- a/lineprotocol.go +++ b/lineprotocol.go @@ -114,9 +114,9 @@ func reorder(buf, prefix []byte) []byte { func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error { // Reduce allocations in loop: t := time.Now() - metrics := make([]Metric, 0, 10) + metric, metricBuf := Metric{}, make([]byte, 0, 16) selector := make([]string, 0, 4) - typeBuf, subTypeBuf := make([]byte, 0, 20), make([]byte, 0) + typeBuf, subTypeBuf := make([]byte, 0, 16), make([]byte, 0) // Optimize for the case where all lines in a "batch" are about the same // cluster and host. By using `WriteToLevel` (level = host), we do not need @@ -124,24 +124,21 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error { var lvl *level = nil var prevCluster, prevHost string = "", "" + var ok bool for dec.Next() { - metrics = metrics[:0] rawmeasurement, err := dec.Measurement() if err != nil { return err } - // A more dense lp format if supported if the measurement is 'data'. - // In that case, the field keys are used as metric names. - if string(rawmeasurement) != "data" { - minfo, ok := memoryStore.metrics[string(rawmeasurement)] - if !ok { - continue - } + // Needs to be copied because another call to dec.* would + // invalidate the returned slice. + metricBuf = append(metricBuf[:0], rawmeasurement...) - metrics = append(metrics, Metric{ - minfo: minfo, - }) + // The go compiler optimizes map[string(byteslice)] lookups: + metric.mc, ok = memoryStore.metrics[string(rawmeasurement)] + if !ok { + continue } typeBuf, subTypeBuf := typeBuf[:0], subTypeBuf[:0] @@ -176,6 +173,7 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error { break } + // We cannot be sure that the "type" tag comes before the "type-id" tag: if len(typeBuf) == 0 { typeBuf = append(typeBuf, val...) } else { @@ -184,6 +182,7 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error { case "type-id": typeBuf = append(typeBuf, val...) case "subtype": + // We cannot be sure that the "subtype" tag comes before the "stype-id" tag: if len(subTypeBuf) == 0 { subTypeBuf = append(subTypeBuf, val...) } else { @@ -197,6 +196,7 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error { } } + // If the cluster or host changed, the lvl was set to nil if lvl == nil { selector = selector[:2] selector[0], selector[1] = cluster, host @@ -204,6 +204,7 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error { prevCluster, prevHost = cluster, host } + // subtypes: selector = selector[:0] if len(typeBuf) > 0 { selector = append(selector, string(typeBuf)) // <- Allocation :( @@ -212,71 +213,34 @@ func decodeLine(dec *lineprotocol.Decoder, clusterDefault string) error { } } - if len(metrics) == 0 { - for { - key, val, err := dec.NextField() - if err != nil { - return err - } - - if key == nil { - break - } - - var value Float - if val.Kind() == lineprotocol.Float { - value = Float(val.FloatV()) - } else if val.Kind() == lineprotocol.Int { - value = Float(val.IntV()) - } else { - return fmt.Errorf("unsupported value type in message: %s", val.Kind().String()) - } - - minfo, ok := memoryStore.metrics[string(key)] - if !ok { - continue - } - - metrics = append(metrics, Metric{ - minfo: minfo, - Value: value, - }) - } - } else { - var value Float - for { - key, val, err := dec.NextField() - if err != nil { - return err - } - - if key == nil { - break - } - - if string(key) != "value" { - return fmt.Errorf("unkown field: '%s' (value: %#v)", string(key), val) - } - - if val.Kind() == lineprotocol.Float { - value = Float(val.FloatV()) - } else if val.Kind() == lineprotocol.Int { - value = Float(val.IntV()) - } else { - return fmt.Errorf("unsupported value type in message: %s", val.Kind().String()) - } + for { + key, val, err := dec.NextField() + if err != nil { + return err } - metrics[0].Value = value + if key == nil { + break + } + + if string(key) != "value" { + return fmt.Errorf("unkown field: '%s' (value: %#v)", string(key), val) + } + + if val.Kind() == lineprotocol.Float { + metric.Value = Float(val.FloatV()) + } else if val.Kind() == lineprotocol.Int { + metric.Value = Float(val.IntV()) + } else { + return fmt.Errorf("unsupported value type in message: %s", val.Kind().String()) + } } - t, err = dec.Time(lineprotocol.Second, t) - if err != nil { + if t, err = dec.Time(lineprotocol.Second, t); err != nil { return err } - // log.Printf("write: %s (%v) -> %v\n", string(measurement), selector, value) - if err := memoryStore.WriteToLevel(lvl, selector, t.Unix(), metrics); err != nil { + if err := memoryStore.WriteToLevel(lvl, selector, t.Unix(), []Metric{metric}); err != nil { return err } } diff --git a/memstore.go b/memstore.go index 017288d..023883b 100644 --- a/memstore.go +++ b/memstore.go @@ -192,35 +192,26 @@ func (b *buffer) read(from, to int64, data []Float) ([]Float, int64, int64, erro return data[:i], from, t, nil } -// Free up and free all buffers in the chain only containing data -// older than `t`. -func (b *buffer) free(t int64) (int, error) { - end := b.end() - if end < t && b.next != nil { - b.next.prev = nil - n := 0 - for b != nil { - prev := b.prev - if prev != nil && prev.start > b.start { - panic("time travel?") - } - - n += 1 - // Buffers that come from the - // archive should not be reused. - if cap(b.data) == BUFFER_CAP { - bufferPool.Put(b) - } - b = prev - } - return n, nil - } - +// Returns true if this buffer needs to be freed. +func (b *buffer) free(t int64) (delme bool, n int) { if b.prev != nil { - return b.prev.free(t) + delme, m := b.prev.free(t) + n += m + if delme { + b.prev.next = nil + if cap(b.prev.data) == BUFFER_CAP { + bufferPool.Put(b.prev) + } + b.prev = nil + } } - return 0, nil + end := b.end() + if end < t { + return true, n + 1 + } + + return false, n } // Call `callback` on every buffer that contains data in the range from `from` to `to`. @@ -300,52 +291,54 @@ func (l *level) findLevelOrCreate(selector []string, nMetrics int) *level { return child.findLevelOrCreate(selector[1:], nMetrics) } -// For aggregation over multiple values at different cpus/sockets/..., not time! -type AggregationStrategy int +func (l *level) free(t int64) (int, error) { + l.lock.Lock() + defer l.lock.Unlock() -const ( - NoAggregation AggregationStrategy = iota - SumAggregation - AvgAggregation -) + n := 0 + for i, b := range l.metrics { + if b != nil { + delme, m := b.free(t) + n += m + if delme { + if cap(b.data) == BUFFER_CAP { + bufferPool.Put(b) + } + l.metrics[i] = nil + } + } + } -type metricInfo struct { - offset int - aggregation AggregationStrategy - frequency int64 + for _, l := range l.children { + m, err := l.free(t) + n += m + if err != nil { + return n, err + } + } + + return n, nil } type MemoryStore struct { root level // root of the tree structure - metrics map[string]metricInfo + metrics map[string]MetricConfig } // Return a new, initialized instance of a MemoryStore. // Will panic if values in the metric configurations are invalid. func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore { - ms := make(map[string]metricInfo) - offset := 0 for key, config := range metrics { - aggregation := NoAggregation - if config.Aggregation == "sum" { - aggregation = SumAggregation - } else if config.Aggregation == "avg" { - aggregation = AvgAggregation - } else if config.Aggregation != "" { - panic("invalid aggregation strategy: " + config.Aggregation) - } - if config.Frequency == 0 { panic("invalid frequency") } - ms[key] = metricInfo{ + metrics[key] = MetricConfig{ + Frequency: config.Frequency, + Aggregation: config.Aggregation, offset: offset, - aggregation: aggregation, - frequency: config.Frequency, } - offset += 1 } @@ -354,7 +347,7 @@ func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore { metrics: make([]*buffer, len(metrics)), children: make(map[string]*level), }, - metrics: ms, + metrics: metrics, } } @@ -363,10 +356,10 @@ func NewMemoryStore(metrics map[string]MetricConfig) *MemoryStore { func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error { var ok bool for i, metric := range metrics { - if metric.minfo.frequency == 0 { - metric.minfo, ok = m.metrics[metric.Name] + if metric.mc.Frequency == 0 { + metric.mc, ok = m.metrics[metric.Name] if !ok { - metric.minfo.frequency = 0 + metric.mc.Frequency = 0 } metrics[i] = metric } @@ -386,15 +379,15 @@ func (m *MemoryStore) WriteToLevel(l *level, selector []string, ts int64, metric defer l.lock.Unlock() for _, metric := range metrics { - if metric.minfo.frequency == 0 { + if metric.mc.Frequency == 0 { continue } - b := l.metrics[metric.minfo.offset] + b := l.metrics[metric.mc.offset] if b == nil { // First write to this metric and level - b = newBuffer(ts, metric.minfo.frequency) - l.metrics[metric.minfo.offset] = b + b = newBuffer(ts, metric.mc.Frequency) + l.metrics[metric.mc.offset] = b } nb, err := b.write(ts, metric.Value) @@ -404,7 +397,7 @@ func (m *MemoryStore) WriteToLevel(l *level, selector []string, ts int64, metric // Last write created a new buffer... if b != nb { - l.metrics[metric.minfo.offset] = nb + l.metrics[metric.mc.offset] = nb } } return nil @@ -424,7 +417,7 @@ func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([] return nil, 0, 0, errors.New("unkown metric: " + metric) } - n, data := 0, make([]Float, (to-from)/minfo.frequency+1) + n, data := 0, make([]Float, (to-from)/minfo.Frequency+1) err := m.root.findBuffers(selector, minfo.offset, func(b *buffer) error { cdata, cfrom, cto, err := b.read(from, to, data) if err != nil { @@ -434,7 +427,17 @@ func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([] if n == 0 { from, to = cfrom, cto } else if from != cfrom || to != cto || len(data) != len(cdata) { - return ErrDataDoesNotAlign + missingfront, missingback := int((from-cfrom)/minfo.Frequency), int((to-cto)/minfo.Frequency) + if missingfront != 0 { + return ErrDataDoesNotAlign + } + + cdata = cdata[0 : len(cdata)-missingback] + if len(cdata) != len(data) { + return ErrDataDoesNotAlign + } + + from, to = cfrom, cto } data = cdata @@ -447,12 +450,12 @@ func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([] } else if n == 0 { return nil, 0, 0, errors.New("metric or host not found") } else if n > 1 { - if minfo.aggregation == AvgAggregation { + if minfo.Aggregation == AvgAggregation { normalize := 1. / Float(n) for i := 0; i < len(data); i++ { data[i] *= normalize } - } else if minfo.aggregation != SumAggregation { + } else if minfo.Aggregation != SumAggregation { return nil, 0, 0, errors.New("invalid aggregation") } } @@ -462,14 +465,8 @@ func (m *MemoryStore) Read(selector Selector, metric string, from, to int64) ([] // Release all buffers for the selected level and all its children that contain only // values older than `t`. -func (m *MemoryStore) Free(selector Selector, t int64) (int, error) { - n := 0 - err := m.root.findBuffers(selector, -1, func(b *buffer) error { - m, err := b.free(t) - n += m - return err - }) - return n, err +func (m *MemoryStore) Free(selector []string, t int64) (int, error) { + return m.GetLevel(selector).free(t) } func (m *MemoryStore) FreeAll() error { diff --git a/memstore_test.go b/memstore_test.go index 5c4ab52..ef02166 100644 --- a/memstore_test.go +++ b/memstore_test.go @@ -201,8 +201,8 @@ func TestMemoryStoreMissingDatapoints(t *testing.T) { func TestMemoryStoreAggregation(t *testing.T) { count := 3000 store := NewMemoryStore(map[string]MetricConfig{ - "a": {Frequency: 1, Aggregation: "sum"}, - "b": {Frequency: 2, Aggregation: "avg"}, + "a": {Frequency: 1, Aggregation: SumAggregation}, + "b": {Frequency: 2, Aggregation: AvgAggregation}, }) for i := 0; i < count; i++ { @@ -269,7 +269,7 @@ func TestMemoryStoreStats(t *testing.T) { count := 3000 store := NewMemoryStore(map[string]MetricConfig{ "a": {Frequency: 1}, - "b": {Frequency: 1, Aggregation: "avg"}, + "b": {Frequency: 1, Aggregation: AvgAggregation}, }) sel1 := []string{"cluster", "host1"} @@ -415,7 +415,7 @@ func TestMemoryStoreFree(t *testing.T) { } } - n, err := store.Free(Selector{{String: "cluster"}, {String: "host"}}, int64(BUFFER_CAP*2)+100) + n, err := store.Free([]string{"cluster", "host"}, int64(BUFFER_CAP*2)+100) if err != nil { t.Fatal(err) } @@ -501,7 +501,7 @@ func BenchmarkMemoryStoreAggregation(b *testing.B) { b.StopTimer() count := 2000 store := NewMemoryStore(map[string]MetricConfig{ - "flops_any": {Frequency: 1, Aggregation: "avg"}, + "flops_any": {Frequency: 1, Aggregation: AvgAggregation}, }) sel := []string{"testcluster", "host123", "cpu0"} diff --git a/selector.go b/selector.go index a2ff52e..de17ed5 100644 --- a/selector.go +++ b/selector.go @@ -55,20 +55,9 @@ func (l *level) findBuffers(selector Selector, offset int, f func(b *buffer) err defer l.lock.RUnlock() if len(selector) == 0 { - if offset == -1 { - for _, b := range l.metrics { - if b != nil { - err := f(b) - if err != nil { - return err - } - } - } - } else { - b := l.metrics[offset] - if b != nil { - return f(b) - } + b := l.metrics[offset] + if b != nil { + return f(b) } for _, lvl := range l.children { diff --git a/stats.go b/stats.go index 2e56dd5..510891b 100644 --- a/stats.go +++ b/stats.go @@ -103,9 +103,9 @@ func (m *MemoryStore) Stats(selector Selector, metric string, from, to int64) (* return nil, 0, 0, ErrNoData } - if minfo.aggregation == AvgAggregation { + if minfo.Aggregation == AvgAggregation { avg /= Float(n) - } else if n > 1 && minfo.aggregation != SumAggregation { + } else if n > 1 && minfo.Aggregation != SumAggregation { return nil, 0, 0, errors.New("invalid aggregation") }