mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-09-12 19:53:00 +02:00
Migration SQL fix
This commit is contained in:
@@ -2,16 +2,18 @@ package memorystore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/avro"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/avro"
|
||||
"github.com/ClusterCockpit/cc-lib/resampler"
|
||||
"github.com/ClusterCockpit/cc-lib/schema"
|
||||
"github.com/ClusterCockpit/cc-lib/util"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/config"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -29,20 +31,101 @@ func init() {
|
||||
}
|
||||
}
|
||||
|
||||
// For aggregation over multiple values at different cpus/sockets/..., not time!
|
||||
type AggregationStrategy int
|
||||
|
||||
const (
|
||||
NoAggregation AggregationStrategy = iota
|
||||
SumAggregation
|
||||
AvgAggregation
|
||||
)
|
||||
|
||||
func (as *AggregationStrategy) UnmarshalJSON(data []byte) error {
|
||||
var str string
|
||||
if err := json.Unmarshal(data, &str); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch str {
|
||||
case "":
|
||||
*as = NoAggregation
|
||||
case "sum":
|
||||
*as = SumAggregation
|
||||
case "avg":
|
||||
*as = AvgAggregation
|
||||
default:
|
||||
return fmt.Errorf("invalid aggregation strategy: %#v", str)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type MetricConfig struct {
|
||||
// Interval in seconds at which measurements will arive.
|
||||
Frequency int64 `json:"frequency"`
|
||||
|
||||
// Can be 'sum', 'avg' or null. Describes how to aggregate metrics from the same timestep over the hierarchy.
|
||||
Aggregation AggregationStrategy `json:"aggregation"`
|
||||
|
||||
// Private, used internally...
|
||||
Offset int
|
||||
}
|
||||
|
||||
type Metric struct {
|
||||
Name string
|
||||
Value util.Float
|
||||
MetricConfig config.MetricConfig
|
||||
MetricConfig MetricConfig
|
||||
}
|
||||
|
||||
type MemoryStore struct {
|
||||
Metrics map[string]config.MetricConfig
|
||||
Metrics map[string]MetricConfig
|
||||
root Level
|
||||
}
|
||||
|
||||
func Init() {
|
||||
startupTime := time.Now()
|
||||
|
||||
//Pass the keys from cluster config
|
||||
InitMetrics()
|
||||
|
||||
ms := GetMemoryStore()
|
||||
|
||||
d, err := time.ParseDuration(Keys.Checkpoints.Restore)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
restoreFrom := startupTime.Add(-d)
|
||||
log.Printf("Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339))
|
||||
files, err := ms.FromCheckpointFiles(Keys.Checkpoints.RootDir, restoreFrom.Unix())
|
||||
loadedData := ms.SizeInBytes() / 1024 / 1024 // In MB
|
||||
if err != nil {
|
||||
log.Fatalf("Loading checkpoints failed: %s\n", err.Error())
|
||||
} else {
|
||||
log.Printf("Checkpoints loaded (%d files, %d MB, that took %fs)\n", files, loadedData, time.Since(startupTime).Seconds())
|
||||
}
|
||||
|
||||
// Try to use less memory by forcing a GC run here and then
|
||||
// lowering the target percentage. The default of 100 means
|
||||
// that only once the ratio of new allocations execeds the
|
||||
// previously active heap, a GC is triggered.
|
||||
// Forcing a GC here will set the "previously active heap"
|
||||
// to a minumum.
|
||||
runtime.GC()
|
||||
|
||||
ctx, _ := context.WithCancel(context.Background())
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(4)
|
||||
|
||||
Retention(&wg, ctx)
|
||||
Checkpointing(&wg, ctx)
|
||||
Archiving(&wg, ctx)
|
||||
avro.DataStaging(&wg, ctx)
|
||||
}
|
||||
|
||||
// Create a new, initialized instance of a MemoryStore.
|
||||
// Will panic if values in the metric configurations are invalid.
|
||||
func Init(metrics map[string]config.MetricConfig) {
|
||||
func InitMetrics(metrics map[string]MetricConfig) {
|
||||
singleton.Do(func() {
|
||||
offset := 0
|
||||
for key, cfg := range metrics {
|
||||
@@ -50,7 +133,7 @@ func Init(metrics map[string]config.MetricConfig) {
|
||||
panic("invalid frequency")
|
||||
}
|
||||
|
||||
metrics[key] = config.MetricConfig{
|
||||
metrics[key] = MetricConfig{
|
||||
Frequency: cfg.Frequency,
|
||||
Aggregation: cfg.Aggregation,
|
||||
Offset: offset,
|
||||
@@ -77,16 +160,16 @@ func GetMemoryStore() *MemoryStore {
|
||||
}
|
||||
|
||||
func Shutdown() {
|
||||
log.Printf("Writing to '%s'...\n", config.Keys.Checkpoints.RootDir)
|
||||
log.Printf("Writing to '%s'...\n", Keys.Checkpoints.RootDir)
|
||||
var files int
|
||||
var err error
|
||||
|
||||
ms := GetMemoryStore()
|
||||
|
||||
if config.Keys.Checkpoints.FileFormat == "json" {
|
||||
files, err = ms.ToCheckpoint(config.Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
|
||||
if Keys.Checkpoints.FileFormat == "json" {
|
||||
files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
|
||||
} else {
|
||||
files, err = avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir, true)
|
||||
files, err = avro.GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, true)
|
||||
close(avro.LineProtocolMessages)
|
||||
}
|
||||
|
||||
@@ -172,7 +255,7 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
d, err := time.ParseDuration(config.Keys.RetentionInMemory)
|
||||
d, err := time.ParseDuration(Keys.RetentionInMemory)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
@@ -261,7 +344,7 @@ func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metric
|
||||
// If the level does not hold the metric itself, the data will be aggregated recursively from the children.
|
||||
// The second and third return value are the actual from/to for the data. Those can be different from
|
||||
// the range asked for if no data was available.
|
||||
func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, resolution int64) ([]util.Float, int64, int64, int64, error) {
|
||||
func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, resolution int64) ([]schema.Float, int64, int64, int64, error) {
|
||||
if from > to {
|
||||
return nil, 0, 0, 0, errors.New("invalid time range")
|
||||
}
|
||||
@@ -271,7 +354,7 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, reso
|
||||
return nil, 0, 0, 0, errors.New("unkown metric: " + metric)
|
||||
}
|
||||
|
||||
n, data := 0, make([]util.Float, (to-from)/minfo.Frequency+1)
|
||||
n, data := 0, make([]schema.Float, (to-from)/minfo.Frequency+1)
|
||||
|
||||
err := m.root.findBuffers(selector, minfo.Offset, func(b *buffer) error {
|
||||
cdata, cfrom, cto, err := b.read(from, to, data)
|
||||
@@ -309,12 +392,12 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, reso
|
||||
} else if n == 0 {
|
||||
return nil, 0, 0, 0, errors.New("metric or host not found")
|
||||
} else if n > 1 {
|
||||
if minfo.Aggregation == config.AvgAggregation {
|
||||
normalize := 1. / util.Float(n)
|
||||
if minfo.Aggregation == AvgAggregation {
|
||||
normalize := 1. / schema.Float(n)
|
||||
for i := 0; i < len(data); i++ {
|
||||
data[i] *= normalize
|
||||
}
|
||||
} else if minfo.Aggregation != config.SumAggregation {
|
||||
} else if minfo.Aggregation != SumAggregation {
|
||||
return nil, 0, 0, 0, errors.New("invalid aggregation")
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user