mirror of
				https://github.com/ClusterCockpit/cc-metric-store.git
				synced 2025-10-25 15:25:07 +02:00 
			
		
		
		
	Add functions to MetricStore for future queries
This commit is contained in:
		| @@ -3,18 +3,22 @@ package main | |||||||
| import ( | import ( | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"math" | 	"math" | ||||||
|  | 	"strings" | ||||||
|  | 	"sync" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/ClusterCockpit/cc-metric-store/lineprotocol" | 	"github.com/ClusterCockpit/cc-metric-store/lineprotocol" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| type storeBuffer struct { | type storeBuffer struct { | ||||||
| 	store []float64 | 	store []lineprotocol.Float | ||||||
| 	start int64 | 	start int64 | ||||||
| } | } | ||||||
|  |  | ||||||
| type buffer struct { | type buffer struct { | ||||||
| 	current *storeBuffer | 	current *storeBuffer | ||||||
| 	next    *storeBuffer | 	next    *storeBuffer | ||||||
|  | 	lock    sync.Mutex | ||||||
| } | } | ||||||
|  |  | ||||||
| //MemoryStore holds the state for a metric memory store. | //MemoryStore holds the state for a metric memory store. | ||||||
| @@ -25,21 +29,22 @@ type MemoryStore struct { | |||||||
| 	frequency  int | 	frequency  int | ||||||
| 	numSlots   int | 	numSlots   int | ||||||
| 	numMetrics int | 	numMetrics int | ||||||
|  | 	lock       sync.Mutex | ||||||
| } | } | ||||||
|  |  | ||||||
| func initBuffer(b *storeBuffer) { | func initBuffer(b *storeBuffer) { | ||||||
| 	for i := 0; i < len(b.store); i++ { | 	for i := 0; i < len(b.store); i++ { | ||||||
| 		b.store[i] = math.NaN() | 		b.store[i] = lineprotocol.Float(math.NaN()) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func allocateBuffer(ts int64, size int) *buffer { | func allocateBuffer(ts int64, size int) *buffer { | ||||||
| 	b := new(buffer) | 	b := new(buffer) | ||||||
| 	s := make([]float64, size) | 	s := make([]lineprotocol.Float, size) | ||||||
| 	b.current = &storeBuffer{s, ts} | 	b.current = &storeBuffer{s, ts} | ||||||
| 	initBuffer(b.current) | 	initBuffer(b.current) | ||||||
|  |  | ||||||
| 	s = make([]float64, size) | 	s = make([]lineprotocol.Float, size) | ||||||
| 	b.next = &storeBuffer{s, 0} | 	b.next = &storeBuffer{s, 0} | ||||||
| 	initBuffer(b.next) | 	initBuffer(b.next) | ||||||
| 	return b | 	return b | ||||||
| @@ -75,13 +80,16 @@ func (m *MemoryStore) AddMetrics( | |||||||
| 	ts int64, | 	ts int64, | ||||||
| 	metrics []lineprotocol.Metric) error { | 	metrics []lineprotocol.Metric) error { | ||||||
|  |  | ||||||
|  | 	m.lock.Lock() | ||||||
| 	b, ok := m.containers[key] | 	b, ok := m.containers[key] | ||||||
|  |  | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		//Key does not exist. Allocate new buffer. | 		//Key does not exist. Allocate new buffer. | ||||||
| 		m.containers[key] = allocateBuffer(ts, m.numMetrics*m.numSlots) | 		m.containers[key] = allocateBuffer(ts, m.numMetrics*m.numSlots) | ||||||
| 		b = m.containers[key] | 		b = m.containers[key] | ||||||
| 	} | 	} | ||||||
|  | 	m.lock.Unlock() | ||||||
|  | 	b.lock.Lock() | ||||||
|  | 	defer b.lock.Unlock() | ||||||
|  |  | ||||||
| 	index := int(ts-b.current.start) / m.frequency | 	index := int(ts-b.current.start) / m.frequency | ||||||
|  |  | ||||||
| @@ -111,14 +119,18 @@ func (m *MemoryStore) GetMetric( | |||||||
| 	key string, | 	key string, | ||||||
| 	metric string, | 	metric string, | ||||||
| 	from int64, | 	from int64, | ||||||
| 	to int64) ([]float64, int64, error) { | 	to int64) ([]lineprotocol.Float, int64, error) { | ||||||
|  |  | ||||||
|  | 	m.lock.Lock() | ||||||
| 	b, ok := m.containers[key] | 	b, ok := m.containers[key] | ||||||
|  | 	m.lock.Unlock() | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		return nil, 0, fmt.Errorf("key %s does not exist", key) | 		return nil, 0, fmt.Errorf("key %s does not exist", key) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	b.lock.Lock() | ||||||
|  | 	defer b.lock.Unlock() | ||||||
|  |  | ||||||
| 	if to <= from { | 	if to <= from { | ||||||
| 		return nil, 0, fmt.Errorf("invalid duration %d - %d", from, to) | 		return nil, 0, fmt.Errorf("invalid duration %d - %d", from, to) | ||||||
| 	} | 	} | ||||||
| @@ -131,11 +143,11 @@ func (m *MemoryStore) GetMetric( | |||||||
| 		return nil, 0, fmt.Errorf("to %d out of bounds", to) | 		return nil, 0, fmt.Errorf("to %d out of bounds", to) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	var values1, values2 []float64 | 	var values1, values2 []lineprotocol.Float | ||||||
| 	offset := m.offsets[metric] | 	offset := m.offsets[metric] * m.numSlots | ||||||
| 	valuesFrom := from | 	valuesFrom := from | ||||||
|  |  | ||||||
| 	if from < b.current.start { | 	if from < b.current.start && b.next.start != 0 { | ||||||
|  |  | ||||||
| 		var start, stop = 0, m.numSlots | 		var start, stop = 0, m.numSlots | ||||||
|  |  | ||||||
| @@ -171,3 +183,68 @@ func (m *MemoryStore) GetMetric( | |||||||
|  |  | ||||||
| 	return append(values1, values2...), valuesFrom, nil | 	return append(values1, values2...), valuesFrom, nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Call *f* once on every value which *GetMetric* would | ||||||
|  | // return for similar arguments. This operation might be known | ||||||
|  | // as fold in Ruby/Haskell/Scala. It can be used to implement | ||||||
|  | // the calculation of sums, averages, minimas and maximas. | ||||||
|  | // The advantage of using this over *GetMetric* for such calculations | ||||||
|  | // is that it can be implemented without copying data. | ||||||
|  | // TODO: Write Tests, implement without calling GetMetric! | ||||||
|  | func (m *MemoryStore) Reduce( | ||||||
|  | 	key string, metric string, | ||||||
|  | 	from int64, to int64, | ||||||
|  | 	f func(t int64, acc lineprotocol.Float, x lineprotocol.Float) lineprotocol.Float, initialX lineprotocol.Float) (lineprotocol.Float, error) { | ||||||
|  |  | ||||||
|  | 	values, valuesFrom, err := m.GetMetric(key, metric, from, to) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return 0.0, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	acc := initialX | ||||||
|  | 	t := valuesFrom | ||||||
|  | 	for i := 0; i < len(values); i++ { | ||||||
|  | 		acc = f(t, acc, values[i]) | ||||||
|  | 		t += int64(m.frequency) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return acc, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Return a map of keys to a map of metrics to the most recent value writen to | ||||||
|  | // the store for that metric. | ||||||
|  | // TODO: Write Tests! | ||||||
|  | func (m *MemoryStore) Peak(prefix string) map[string]map[string]lineprotocol.Float { | ||||||
|  | 	m.lock.Lock() | ||||||
|  | 	defer m.lock.Unlock() | ||||||
|  |  | ||||||
|  | 	now := time.Now().Unix() | ||||||
|  |  | ||||||
|  | 	retval := make(map[string]map[string]lineprotocol.Float) | ||||||
|  | 	for key, b := range m.containers { | ||||||
|  | 		if !strings.HasPrefix(key, prefix) { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		b.lock.Lock() | ||||||
|  | 		index := int(now-b.current.start) / m.frequency | ||||||
|  | 		if index >= m.numSlots { | ||||||
|  | 			index = m.numSlots - 1 | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		vals := make(map[string]lineprotocol.Float) | ||||||
|  | 		for metric, offset := range m.offsets { | ||||||
|  | 			val := lineprotocol.Float(math.NaN()) | ||||||
|  | 			for i := index; i >= 0 && math.IsNaN(float64(val)); i -= 1 { | ||||||
|  | 				val = b.current.store[offset*m.numSlots+i] | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			vals[metric] = val | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		b.lock.Unlock() | ||||||
|  | 		retval[key[len(prefix):]] = vals | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return retval | ||||||
|  | } | ||||||
|   | |||||||
| @@ -261,7 +261,7 @@ func TestGetMetricGap(t *testing.T) { | |||||||
| 	if val[0] != 100.5 { | 	if val[0] != 100.5 { | ||||||
| 		t.Errorf("Want 100.5 Got %f\n", val[0]) | 		t.Errorf("Want 100.5 Got %f\n", val[0]) | ||||||
| 	} | 	} | ||||||
| 	if !math.IsNaN(val[1]) { | 	if !math.IsNaN(float64(val[1])) { | ||||||
| 		t.Errorf("Want NaN Got %f\n", val[1]) | 		t.Errorf("Want NaN Got %f\n", val[1]) | ||||||
| 	} | 	} | ||||||
| 	if val[0] != 100.5 { | 	if val[0] != 100.5 { | ||||||
|   | |||||||
| @@ -15,7 +15,9 @@ import ( | |||||||
|  |  | ||||||
| type MetricStore interface { | type MetricStore interface { | ||||||
| 	AddMetrics(key string, ts int64, metrics []lineprotocol.Metric) error | 	AddMetrics(key string, ts int64, metrics []lineprotocol.Metric) error | ||||||
| 	GetMetric(key string, metric string, from int64, to int64) ([]float64, int64, error) | 	GetMetric(key string, metric string, from int64, to int64) ([]lineprotocol.Float, int64, error) | ||||||
|  | 	Reduce(key, metric string, from, to int64, f func(t int64, sum, x lineprotocol.Float) lineprotocol.Float, initialX lineprotocol.Float) (lineprotocol.Float, error) | ||||||
|  | 	Peak(prefix string) map[string]map[string]lineprotocol.Float | ||||||
| } | } | ||||||
|  |  | ||||||
| type Config struct { | type Config struct { | ||||||
| @@ -26,6 +28,7 @@ type Config struct { | |||||||
| } | } | ||||||
|  |  | ||||||
| var conf Config | var conf Config | ||||||
|  |  | ||||||
| var metricStores map[string]MetricStore = map[string]MetricStore{} | var metricStores map[string]MetricStore = map[string]MetricStore{} | ||||||
|  |  | ||||||
| func loadConfiguration(file string) Config { | func loadConfiguration(file string) Config { | ||||||
| @@ -53,24 +56,38 @@ func buildKey(line *lineprotocol.Line) (string, error) { | |||||||
| 		return "", errors.New("missing host tag") | 		return "", errors.New("missing host tag") | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	socket, ok := line.Tags["socket"] | ||||||
|  | 	if ok { | ||||||
|  | 		return cluster + ":" + host + ":s" + socket, nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	cpu, ok := line.Tags["cpu"] | 	cpu, ok := line.Tags["cpu"] | ||||||
| 	if ok { | 	if ok { | ||||||
| 		return cluster + ":" + host + ":" + cpu, nil | 		return cluster + ":" + host + ":c" + cpu, nil | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return cluster + ":" + host, nil | 	return cluster + ":" + host, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func handleLine(line *lineprotocol.Line) { | func handleLine(line *lineprotocol.Line) { | ||||||
| 	log.Printf("line: %v (t=%d)\n", line, line.Ts.Unix()) | 	store, ok := metricStores[line.Measurement] | ||||||
|  | 	if !ok { | ||||||
|  | 		log.Printf("unkown class: '%s'\n", line.Measurement) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	store := metricStores[line.Measurement] |  | ||||||
| 	key, err := buildKey(line) | 	key, err := buildKey(line) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Println(err) | 		log.Println(err) | ||||||
|  | 		return | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// log.Printf("t=%d, key='%s', values=%v\n", line.Ts.Unix(), key, line.Fields) | ||||||
|  | 	log.Printf("new data: t=%d, key='%s'", line.Ts.Unix(), key) | ||||||
| 	err = store.AddMetrics(key, line.Ts.Unix(), line.Fields) | 	err = store.AddMetrics(key, line.Ts.Unix(), line.Fields) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.Println(err) | ||||||
|  | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func main() { | func main() { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user