Combined metricstore api and functions

This commit is contained in:
Aditya Ujeniya
2025-09-08 11:29:27 +02:00
parent bca176170c
commit 62565b9ae2
26 changed files with 1248 additions and 430 deletions

View File

@@ -2,16 +2,19 @@ package memorystore
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"
"github.com/ClusterCockpit/cc-backend/internal/avro"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-lib/resampler"
"github.com/ClusterCockpit/cc-lib/runtimeEnv"
"github.com/ClusterCockpit/cc-lib/schema"
"github.com/ClusterCockpit/cc-lib/util"
)
@@ -21,6 +24,8 @@ var (
msInstance *MemoryStore
)
var Clusters = make([]string, 0)
var NumWorkers int = 4
func init() {
@@ -31,77 +36,38 @@ func init() {
}
}
// For aggregation over multiple values at different cpus/sockets/..., not time!
type AggregationStrategy int
const (
NoAggregation AggregationStrategy = iota
SumAggregation
AvgAggregation
)
func (as *AggregationStrategy) UnmarshalJSON(data []byte) error {
var str string
if err := json.Unmarshal(data, &str); err != nil {
return err
}
switch str {
case "":
*as = NoAggregation
case "sum":
*as = SumAggregation
case "avg":
*as = AvgAggregation
default:
return fmt.Errorf("invalid aggregation strategy: %#v", str)
}
return nil
}
type MetricConfig struct {
// Interval in seconds at which measurements will arive.
Frequency int64 `json:"frequency"`
// Can be 'sum', 'avg' or null. Describes how to aggregate metrics from the same timestep over the hierarchy.
Aggregation AggregationStrategy `json:"aggregation"`
// Private, used internally...
Offset int
}
type Metric struct {
Name string
Value util.Float
MetricConfig MetricConfig
Value schema.Float
MetricConfig config.MetricConfig
}
type MemoryStore struct {
Metrics map[string]MetricConfig
Metrics map[string]config.MetricConfig
root Level
}
func Init() {
func Init(wg sync.WaitGroup) {
startupTime := time.Now()
//Pass the keys from cluster config
InitMetrics()
//Pass the config.MetricStoreKeys
InitMetrics(config.Metrics)
ms := GetMemoryStore()
d, err := time.ParseDuration(Keys.Checkpoints.Restore)
d, err := time.ParseDuration(config.MetricStoreKeys.Checkpoints.Restore)
if err != nil {
log.Fatal(err)
}
restoreFrom := startupTime.Add(-d)
log.Printf("Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339))
files, err := ms.FromCheckpointFiles(Keys.Checkpoints.RootDir, restoreFrom.Unix())
log.Printf("[METRICSTORE]> Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339))
files, err := ms.FromCheckpointFiles(config.MetricStoreKeys.Checkpoints.RootDir, restoreFrom.Unix())
loadedData := ms.SizeInBytes() / 1024 / 1024 // In MB
if err != nil {
log.Fatalf("Loading checkpoints failed: %s\n", err.Error())
log.Fatalf("[METRICSTORE]> Loading checkpoints failed: %s\n", err.Error())
} else {
log.Printf("Checkpoints loaded (%d files, %d MB, that took %fs)\n", files, loadedData, time.Since(startupTime).Seconds())
log.Printf("[METRICSTORE]> Checkpoints loaded (%d files, %d MB, that took %fs)\n", files, loadedData, time.Since(startupTime).Seconds())
}
// Try to use less memory by forcing a GC run here and then
@@ -112,28 +78,53 @@ func Init() {
// to a minumum.
runtime.GC()
ctx, _ := context.WithCancel(context.Background())
ctx, shutdown := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(4)
Retention(&wg, ctx)
Checkpointing(&wg, ctx)
Archiving(&wg, ctx)
avro.DataStaging(&wg, ctx)
wg.Add(1)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
defer wg.Done()
<-sigs
runtimeEnv.SystemdNotifiy(false, "[METRICSTORE]> Shutting down ...")
shutdown()
}()
if config.MetricStoreKeys.Nats != nil {
for _, natsConf := range config.MetricStoreKeys.Nats {
// TODO: When multiple nats configs share a URL, do a single connect.
wg.Add(1)
nc := natsConf
go func() {
// err := ReceiveNats(conf.Nats, decodeLine, runtime.NumCPU()-1, ctx)
err := ReceiveNats(nc, ms, 1, ctx)
if err != nil {
log.Fatal(err)
}
wg.Done()
}()
}
}
}
// Create a new, initialized instance of a MemoryStore.
// Will panic if values in the metric configurations are invalid.
func InitMetrics(metrics map[string]MetricConfig) {
func InitMetrics(metrics map[string]config.MetricConfig) {
singleton.Do(func() {
offset := 0
for key, cfg := range metrics {
if cfg.Frequency == 0 {
panic("invalid frequency")
panic("[METRICSTORE]> invalid frequency")
}
metrics[key] = MetricConfig{
metrics[key] = config.MetricConfig{
Frequency: cfg.Frequency,
Aggregation: cfg.Aggregation,
Offset: offset,
@@ -153,30 +144,30 @@ func InitMetrics(metrics map[string]MetricConfig) {
func GetMemoryStore() *MemoryStore {
if msInstance == nil {
log.Fatalf("MemoryStore not initialized!")
log.Fatalf("[METRICSTORE]> MemoryStore not initialized!")
}
return msInstance
}
func Shutdown() {
log.Printf("Writing to '%s'...\n", Keys.Checkpoints.RootDir)
log.Printf("[METRICSTORE]> Writing to '%s'...\n", config.MetricStoreKeys.Checkpoints.RootDir)
var files int
var err error
ms := GetMemoryStore()
if Keys.Checkpoints.FileFormat == "json" {
files, err = ms.ToCheckpoint(Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
if config.MetricStoreKeys.Checkpoints.FileFormat == "json" {
files, err = ms.ToCheckpoint(config.MetricStoreKeys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
} else {
files, err = avro.GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, true)
files, err = avro.GetAvroStore().ToCheckpoint(config.MetricStoreKeys.Checkpoints.RootDir, true)
close(avro.LineProtocolMessages)
}
if err != nil {
log.Printf("Writing checkpoint failed: %s\n", err.Error())
log.Printf("[METRICSTORE]> Writing checkpoint failed: %s\n", err.Error())
}
log.Printf("Done! (%d files written)\n", files)
log.Printf("[METRICSTORE]> Done! (%d files written)\n", files)
// ms.PrintHeirarchy()
}
@@ -255,7 +246,7 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
go func() {
defer wg.Done()
d, err := time.ParseDuration(Keys.RetentionInMemory)
d, err := time.ParseDuration(config.MetricStoreKeys.RetentionInMemory)
if err != nil {
log.Fatal(err)
}
@@ -276,12 +267,12 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
return
case <-ticks:
t := time.Now().Add(-d)
log.Printf("start freeing buffers (older than %s)...\n", t.Format(time.RFC3339))
log.Printf("[METRICSTORE]> start freeing buffers (older than %s)...\n", t.Format(time.RFC3339))
freed, err := ms.Free(nil, t.Unix())
if err != nil {
log.Printf("freeing up buffers failed: %s\n", err.Error())
log.Printf("[METRICSTORE]> freeing up buffers failed: %s\n", err.Error())
} else {
log.Printf("done: %d buffers freed\n", freed)
log.Printf("[METRICSTORE]> done: %d buffers freed\n", freed)
}
}
}
@@ -346,12 +337,12 @@ func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metric
// the range asked for if no data was available.
func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, resolution int64) ([]schema.Float, int64, int64, int64, error) {
if from > to {
return nil, 0, 0, 0, errors.New("invalid time range")
return nil, 0, 0, 0, errors.New("[METRICSTORE]> invalid time range")
}
minfo, ok := m.Metrics[metric]
if !ok {
return nil, 0, 0, 0, errors.New("unkown metric: " + metric)
return nil, 0, 0, 0, errors.New("[METRICSTORE]> unkown metric: " + metric)
}
n, data := 0, make([]schema.Float, (to-from)/minfo.Frequency+1)
@@ -390,15 +381,15 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, reso
if err != nil {
return nil, 0, 0, 0, err
} else if n == 0 {
return nil, 0, 0, 0, errors.New("metric or host not found")
return nil, 0, 0, 0, errors.New("[METRICSTORE]> metric or host not found")
} else if n > 1 {
if minfo.Aggregation == AvgAggregation {
if minfo.Aggregation == config.AvgAggregation {
normalize := 1. / schema.Float(n)
for i := 0; i < len(data); i++ {
data[i] *= normalize
}
} else if minfo.Aggregation != SumAggregation {
return nil, 0, 0, 0, errors.New("invalid aggregation")
} else if minfo.Aggregation != config.SumAggregation {
return nil, 0, 0, 0, errors.New("[METRICSTORE]> invalid aggregation")
}
}