diff --git a/memoryStore.go b/memoryStore.go index 9b1ff58..dbc6f00 100644 --- a/memoryStore.go +++ b/memoryStore.go @@ -3,18 +3,22 @@ package main import ( "fmt" "math" + "strings" + "sync" + "time" "github.com/ClusterCockpit/cc-metric-store/lineprotocol" ) type storeBuffer struct { - store []float64 + store []lineprotocol.Float start int64 } type buffer struct { current *storeBuffer next *storeBuffer + lock sync.Mutex } //MemoryStore holds the state for a metric memory store. @@ -25,21 +29,22 @@ type MemoryStore struct { frequency int numSlots int numMetrics int + lock sync.Mutex } func initBuffer(b *storeBuffer) { for i := 0; i < len(b.store); i++ { - b.store[i] = math.NaN() + b.store[i] = lineprotocol.Float(math.NaN()) } } func allocateBuffer(ts int64, size int) *buffer { b := new(buffer) - s := make([]float64, size) + s := make([]lineprotocol.Float, size) b.current = &storeBuffer{s, ts} initBuffer(b.current) - s = make([]float64, size) + s = make([]lineprotocol.Float, size) b.next = &storeBuffer{s, 0} initBuffer(b.next) return b @@ -75,13 +80,16 @@ func (m *MemoryStore) AddMetrics( ts int64, metrics []lineprotocol.Metric) error { + m.lock.Lock() b, ok := m.containers[key] - if !ok { //Key does not exist. Allocate new buffer. m.containers[key] = allocateBuffer(ts, m.numMetrics*m.numSlots) b = m.containers[key] } + m.lock.Unlock() + b.lock.Lock() + defer b.lock.Unlock() index := int(ts-b.current.start) / m.frequency @@ -111,14 +119,18 @@ func (m *MemoryStore) GetMetric( key string, metric string, from int64, - to int64) ([]float64, int64, error) { + to int64) ([]lineprotocol.Float, int64, error) { + m.lock.Lock() b, ok := m.containers[key] - + m.lock.Unlock() if !ok { return nil, 0, fmt.Errorf("key %s does not exist", key) } + b.lock.Lock() + defer b.lock.Unlock() + if to <= from { return nil, 0, fmt.Errorf("invalid duration %d - %d", from, to) } @@ -131,11 +143,11 @@ func (m *MemoryStore) GetMetric( return nil, 0, fmt.Errorf("to %d out of bounds", to) } - var values1, values2 []float64 - offset := m.offsets[metric] + var values1, values2 []lineprotocol.Float + offset := m.offsets[metric] * m.numSlots valuesFrom := from - if from < b.current.start { + if from < b.current.start && b.next.start != 0 { var start, stop = 0, m.numSlots @@ -171,3 +183,68 @@ func (m *MemoryStore) GetMetric( return append(values1, values2...), valuesFrom, nil } + +// Call *f* once on every value which *GetMetric* would +// return for similar arguments. This operation might be known +// as fold in Ruby/Haskell/Scala. It can be used to implement +// the calculation of sums, averages, minimas and maximas. +// The advantage of using this over *GetMetric* for such calculations +// is that it can be implemented without copying data. +// TODO: Write Tests, implement without calling GetMetric! +func (m *MemoryStore) Reduce( + key string, metric string, + from int64, to int64, + f func(t int64, acc lineprotocol.Float, x lineprotocol.Float) lineprotocol.Float, initialX lineprotocol.Float) (lineprotocol.Float, error) { + + values, valuesFrom, err := m.GetMetric(key, metric, from, to) + if err != nil { + return 0.0, err + } + + acc := initialX + t := valuesFrom + for i := 0; i < len(values); i++ { + acc = f(t, acc, values[i]) + t += int64(m.frequency) + } + + return acc, nil +} + +// Return a map of keys to a map of metrics to the most recent value writen to +// the store for that metric. +// TODO: Write Tests! +func (m *MemoryStore) Peak(prefix string) map[string]map[string]lineprotocol.Float { + m.lock.Lock() + defer m.lock.Unlock() + + now := time.Now().Unix() + + retval := make(map[string]map[string]lineprotocol.Float) + for key, b := range m.containers { + if !strings.HasPrefix(key, prefix) { + continue + } + + b.lock.Lock() + index := int(now-b.current.start) / m.frequency + if index >= m.numSlots { + index = m.numSlots - 1 + } + + vals := make(map[string]lineprotocol.Float) + for metric, offset := range m.offsets { + val := lineprotocol.Float(math.NaN()) + for i := index; i >= 0 && math.IsNaN(float64(val)); i -= 1 { + val = b.current.store[offset*m.numSlots+i] + } + + vals[metric] = val + } + + b.lock.Unlock() + retval[key[len(prefix):]] = vals + } + + return retval +} diff --git a/memoryStore_test.go b/memoryStore_test.go index 46c3941..b64c85b 100644 --- a/memoryStore_test.go +++ b/memoryStore_test.go @@ -261,7 +261,7 @@ func TestGetMetricGap(t *testing.T) { if val[0] != 100.5 { t.Errorf("Want 100.5 Got %f\n", val[0]) } - if !math.IsNaN(val[1]) { + if !math.IsNaN(float64(val[1])) { t.Errorf("Want NaN Got %f\n", val[1]) } if val[0] != 100.5 { diff --git a/metric-store.go b/metric-store.go index 38ec923..38a3564 100644 --- a/metric-store.go +++ b/metric-store.go @@ -15,7 +15,9 @@ import ( type MetricStore interface { AddMetrics(key string, ts int64, metrics []lineprotocol.Metric) error - GetMetric(key string, metric string, from int64, to int64) ([]float64, int64, error) + GetMetric(key string, metric string, from int64, to int64) ([]lineprotocol.Float, int64, error) + Reduce(key, metric string, from, to int64, f func(t int64, sum, x lineprotocol.Float) lineprotocol.Float, initialX lineprotocol.Float) (lineprotocol.Float, error) + Peak(prefix string) map[string]map[string]lineprotocol.Float } type Config struct { @@ -26,6 +28,7 @@ type Config struct { } var conf Config + var metricStores map[string]MetricStore = map[string]MetricStore{} func loadConfiguration(file string) Config { @@ -53,24 +56,38 @@ func buildKey(line *lineprotocol.Line) (string, error) { return "", errors.New("missing host tag") } + socket, ok := line.Tags["socket"] + if ok { + return cluster + ":" + host + ":s" + socket, nil + } + cpu, ok := line.Tags["cpu"] if ok { - return cluster + ":" + host + ":" + cpu, nil + return cluster + ":" + host + ":c" + cpu, nil } return cluster + ":" + host, nil } func handleLine(line *lineprotocol.Line) { - log.Printf("line: %v (t=%d)\n", line, line.Ts.Unix()) + store, ok := metricStores[line.Measurement] + if !ok { + log.Printf("unkown class: '%s'\n", line.Measurement) + return + } - store := metricStores[line.Measurement] key, err := buildKey(line) if err != nil { log.Println(err) + return } + // log.Printf("t=%d, key='%s', values=%v\n", line.Ts.Unix(), key, line.Fields) + log.Printf("new data: t=%d, key='%s'", line.Ts.Unix(), key) err = store.AddMetrics(key, line.Ts.Unix(), line.Fields) + if err != nil { + log.Println(err) + } } func main() {