Resampler implemented

This commit is contained in:
Aditya Ujeniya
2024-08-21 09:47:16 +02:00
parent 362adab938
commit b186dca79d
7 changed files with 351 additions and 56 deletions

View File

@@ -10,6 +10,7 @@ import (
"github.com/ClusterCockpit/cc-metric-store/internal/config"
"github.com/ClusterCockpit/cc-metric-store/internal/util"
"github.com/ClusterCockpit/cc-metric-store/pkg/resampler"
)
var (
@@ -178,17 +179,18 @@ func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metric
// If the level does not hold the metric itself, the data will be aggregated recursively from the children.
// The second and third return value are the actual from/to for the data. Those can be different from
// the range asked for if no data was available.
func (m *MemoryStore) Read(selector util.Selector, metric string, from, to int64) ([]util.Float, int64, int64, error) {
func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, resolution int64) ([]util.Float, int64, int64, int64, error) {
if from > to {
return nil, 0, 0, errors.New("invalid time range")
return nil, 0, 0, 0, errors.New("invalid time range")
}
minfo, ok := m.Metrics[metric]
if !ok {
return nil, 0, 0, errors.New("unkown metric: " + metric)
return nil, 0, 0, 0, errors.New("unkown metric: " + metric)
}
n, data := 0, make([]util.Float, (to-from)/minfo.Frequency+1)
err := m.root.findBuffers(selector, minfo.Offset, func(b *buffer) error {
cdata, cfrom, cto, err := b.read(from, to, data)
if err != nil {
@@ -221,9 +223,9 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to int64
})
if err != nil {
return nil, 0, 0, err
return nil, 0, 0, 0, err
} else if n == 0 {
return nil, 0, 0, errors.New("metric or host not found")
return nil, 0, 0, 0, errors.New("metric or host not found")
} else if n > 1 {
if minfo.Aggregation == config.AvgAggregation {
normalize := 1. / util.Float(n)
@@ -231,11 +233,17 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to int64
data[i] *= normalize
}
} else if minfo.Aggregation != config.SumAggregation {
return nil, 0, 0, errors.New("invalid aggregation")
return nil, 0, 0, 0, errors.New("invalid aggregation")
}
}
return data, from, to, nil
data, resolution, err = resampler.LargestTriangleThreeBucket(data, minfo.Frequency, resolution)
if err != nil {
return nil, 0, 0, 0, err
}
return data, from, to, resolution, nil
}
// Release all buffers for the selected level and all its children that contain only