cc-metric-store/internal/memorystore/memorystore.go

292 lines
6.7 KiB
Go
Raw Normal View History

package memorystore
import (
2024-05-06 14:20:43 +02:00
"context"
"errors"
"log"
2024-05-06 14:20:43 +02:00
"runtime"
"sync"
2024-05-06 14:20:43 +02:00
"time"
"github.com/ClusterCockpit/cc-metric-store/internal/config"
"github.com/ClusterCockpit/cc-metric-store/internal/util"
2024-08-21 09:47:16 +02:00
"github.com/ClusterCockpit/cc-metric-store/pkg/resampler"
)
var (
singleton sync.Once
msInstance *MemoryStore
)
2024-05-06 14:20:43 +02:00
var NumWorkers int = 4
func init() {
maxWorkers := 10
NumWorkers = runtime.NumCPU()/2 + 1
if NumWorkers > maxWorkers {
NumWorkers = maxWorkers
}
}
type Metric struct {
Name string
Value util.Float
MetricConfig config.MetricConfig
}
type MemoryStore struct {
Metrics map[string]config.MetricConfig
2024-05-06 14:20:43 +02:00
root Level
}
// Create a new, initialized instance of a MemoryStore.
// Will panic if values in the metric configurations are invalid.
func Init(metrics map[string]config.MetricConfig) {
singleton.Do(func() {
offset := 0
for key, cfg := range metrics {
if cfg.Frequency == 0 {
panic("invalid frequency")
}
metrics[key] = config.MetricConfig{
Frequency: cfg.Frequency,
Aggregation: cfg.Aggregation,
Offset: offset,
}
offset += 1
}
msInstance = &MemoryStore{
root: Level{
metrics: make([]*buffer, len(metrics)),
children: make(map[string]*Level),
},
Metrics: metrics,
}
})
}
func GetMemoryStore() *MemoryStore {
if msInstance == nil {
log.Fatalf("MemoryStore not initialized!")
}
return msInstance
}
2024-05-06 14:20:43 +02:00
func Shutdown() {
ms := GetMemoryStore()
log.Printf("Writing to '%s'...\n", config.Keys.Checkpoints.RootDir)
files, err := ms.ToCheckpoint(config.Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
if err != nil {
log.Printf("Writing checkpoint failed: %s\n", err.Error())
}
log.Printf("Done! (%d files written)\n", files)
}
func Retention(wg *sync.WaitGroup, ctx context.Context) {
ms := GetMemoryStore()
go func() {
defer wg.Done()
d, err := time.ParseDuration(config.Keys.RetentionInMemory)
if err != nil {
log.Fatal(err)
}
if d <= 0 {
return
}
ticks := func() <-chan time.Time {
d := d / 2
if d <= 0 {
return nil
}
return time.NewTicker(d).C
}()
for {
select {
case <-ctx.Done():
return
case <-ticks:
t := time.Now().Add(-d)
log.Printf("start freeing buffers (older than %s)...\n", t.Format(time.RFC3339))
freed, err := ms.Free(nil, t.Unix())
if err != nil {
log.Printf("freeing up buffers failed: %s\n", err.Error())
} else {
log.Printf("done: %d buffers freed\n", freed)
}
}
}
}()
}
// Write all values in `metrics` to the level specified by `selector` for time `ts`.
// Look at `findLevelOrCreate` for how selectors work.
func (m *MemoryStore) Write(selector []string, ts int64, metrics []Metric) error {
var ok bool
for i, metric := range metrics {
if metric.MetricConfig.Frequency == 0 {
metric.MetricConfig, ok = m.Metrics[metric.Name]
if !ok {
metric.MetricConfig.Frequency = 0
}
metrics[i] = metric
}
}
return m.WriteToLevel(&m.root, selector, ts, metrics)
}
func (m *MemoryStore) GetLevel(selector []string) *Level {
return m.root.findLevelOrCreate(selector, len(m.Metrics))
}
// Assumes that `minfo` in `metrics` is filled in!
func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metrics []Metric) error {
l = l.findLevelOrCreate(selector, len(m.Metrics))
l.lock.Lock()
defer l.lock.Unlock()
for _, metric := range metrics {
if metric.MetricConfig.Frequency == 0 {
continue
}
b := l.metrics[metric.MetricConfig.Offset]
if b == nil {
// First write to this metric and level
b = newBuffer(ts, metric.MetricConfig.Frequency)
l.metrics[metric.MetricConfig.Offset] = b
}
nb, err := b.write(ts, metric.Value)
if err != nil {
return err
}
// Last write created a new buffer...
if b != nb {
l.metrics[metric.MetricConfig.Offset] = nb
}
}
return nil
}
// Returns all values for metric `metric` from `from` to `to` for the selected level(s).
// If the level does not hold the metric itself, the data will be aggregated recursively from the children.
// The second and third return value are the actual from/to for the data. Those can be different from
// the range asked for if no data was available.
2024-08-21 09:47:16 +02:00
func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, resolution int64) ([]util.Float, int64, int64, int64, error) {
if from > to {
2024-08-21 09:47:16 +02:00
return nil, 0, 0, 0, errors.New("invalid time range")
}
minfo, ok := m.Metrics[metric]
if !ok {
2024-08-21 09:47:16 +02:00
return nil, 0, 0, 0, errors.New("unkown metric: " + metric)
}
n, data := 0, make([]util.Float, (to-from)/minfo.Frequency+1)
2024-08-21 09:47:16 +02:00
err := m.root.findBuffers(selector, minfo.Offset, func(b *buffer) error {
cdata, cfrom, cto, err := b.read(from, to, data)
if err != nil {
return err
}
if n == 0 {
from, to = cfrom, cto
} else if from != cfrom || to != cto || len(data) != len(cdata) {
missingfront, missingback := int((from-cfrom)/minfo.Frequency), int((to-cto)/minfo.Frequency)
if missingfront != 0 {
return ErrDataDoesNotAlign
}
newlen := len(cdata) - missingback
if newlen < 1 {
return ErrDataDoesNotAlign
}
cdata = cdata[0:newlen]
if len(cdata) != len(data) {
return ErrDataDoesNotAlign
}
from, to = cfrom, cto
}
data = cdata
n += 1
return nil
})
if err != nil {
2024-08-21 09:47:16 +02:00
return nil, 0, 0, 0, err
} else if n == 0 {
2024-08-21 09:47:16 +02:00
return nil, 0, 0, 0, errors.New("metric or host not found")
} else if n > 1 {
if minfo.Aggregation == config.AvgAggregation {
normalize := 1. / util.Float(n)
for i := 0; i < len(data); i++ {
data[i] *= normalize
}
} else if minfo.Aggregation != config.SumAggregation {
2024-08-21 09:47:16 +02:00
return nil, 0, 0, 0, errors.New("invalid aggregation")
}
}
2024-08-21 09:47:16 +02:00
data, resolution, err = resampler.LargestTriangleThreeBucket(data, minfo.Frequency, resolution)
if err != nil {
return nil, 0, 0, 0, err
}
return data, from, to, resolution, nil
}
// Release all buffers for the selected level and all its children that contain only
// values older than `t`.
func (m *MemoryStore) Free(selector []string, t int64) (int, error) {
return m.GetLevel(selector).free(t)
}
func (m *MemoryStore) FreeAll() error {
for k := range m.root.children {
delete(m.root.children, k)
}
return nil
}
func (m *MemoryStore) SizeInBytes() int64 {
return m.root.sizeInBytes()
}
// Given a selector, return a list of all children of the level selected.
func (m *MemoryStore) ListChildren(selector []string) []string {
lvl := &m.root
for lvl != nil && len(selector) != 0 {
lvl.lock.RLock()
next := lvl.children[selector[0]]
lvl.lock.RUnlock()
lvl = next
selector = selector[1:]
}
if lvl == nil {
return nil
}
lvl.lock.RLock()
defer lvl.lock.RUnlock()
children := make([]string, 0, len(lvl.children))
for child := range lvl.children {
children = append(children, child)
}
return children
}