Review and refactor

This commit is contained in:
2025-12-16 09:45:48 +01:00
parent 14f1192ccb
commit 5e2cbd75fa
12 changed files with 108 additions and 244 deletions

View File

@@ -6,12 +6,18 @@
package memorystore
import (
"errors"
"math"
"github.com/ClusterCockpit/cc-lib/schema"
"github.com/ClusterCockpit/cc-lib/util"
)
var (
ErrInvalidTimeRange = errors.New("[METRICSTORE]> invalid time range: 'from' must be before 'to'")
ErrEmptyCluster = errors.New("[METRICSTORE]> cluster name cannot be empty")
)
type APIMetricData struct {
Error *string `json:"error,omitempty"`
Data schema.FloatArray `json:"data,omitempty"`
@@ -109,10 +115,14 @@ func (data *APIMetricData) PadDataWithNull(ms *MemoryStore, from, to int64, metr
}
func FetchData(req APIQueryRequest) (*APIQueryResponse, error) {
req.WithData = true
req.WithData = true
req.WithData = true
if req.From > req.To {
return nil, ErrInvalidTimeRange
}
if req.Cluster == "" && req.ForAllNodes != nil {
return nil, ErrEmptyCluster
}
req.WithData = true
ms := GetMemoryStore()
response := APIQueryResponse{

View File

@@ -32,17 +32,14 @@ func Archiving(wg *sync.WaitGroup, ctx context.Context) {
return
}
ticks := func() <-chan time.Time {
if d <= 0 {
return nil
}
return time.NewTicker(d).C
}()
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticks:
case <-ticker.C:
t := time.Now().Add(-d)
cclog.Infof("[METRICSTORE]> start archiving checkpoints (older than %s)...", t.Format(time.RFC3339))
n, err := ArchiveCheckpoints(Keys.Checkpoints.RootDir,
@@ -165,23 +162,31 @@ func archiveCheckpoints(dir string, archiveDir string, from int64, deleteInstead
n := 0
for _, checkpoint := range files {
// Use closure to ensure file is closed immediately after use,
// avoiding file descriptor leak from defer in loop
err := func() error {
filename := filepath.Join(dir, checkpoint)
r, err := os.Open(filename)
if err != nil {
return n, err
return err
}
defer r.Close()
w, err := zw.Create(checkpoint)
if err != nil {
return n, err
return err
}
if _, err = io.Copy(w, r); err != nil {
return n, err
return err
}
if err = os.Remove(filename); err != nil {
return err
}
return nil
}()
if err != nil {
return n, err
}
n += 1

View File

@@ -24,9 +24,8 @@ import (
"github.com/linkedin/goavro/v2"
)
var NumAvroWorkers int = 4
var NumAvroWorkers int = DefaultAvroWorkers
var startUp bool = true
var ErrNoNewData error = errors.New("no data in the pool")
func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) {
levels := make([]*AvroLevel, 0)
@@ -464,19 +463,15 @@ func generateRecord(data map[string]schema.Float) map[string]any {
}
func correctKey(key string) string {
// Replace any invalid characters in the key
// For example, replace spaces with underscores
key = strings.ReplaceAll(key, ":", "___")
key = strings.ReplaceAll(key, ".", "__")
key = strings.ReplaceAll(key, "_", "_0x5F_")
key = strings.ReplaceAll(key, ":", "_0x3A_")
key = strings.ReplaceAll(key, ".", "_0x2E_")
return key
}
func ReplaceKey(key string) string {
// Replace any invalid characters in the key
// For example, replace spaces with underscores
key = strings.ReplaceAll(key, "___", ":")
key = strings.ReplaceAll(key, "__", ".")
key = strings.ReplaceAll(key, "_0x2E_", ".")
key = strings.ReplaceAll(key, "_0x3A_", ":")
key = strings.ReplaceAll(key, "_0x5F_", "_")
return key
}

View File

@@ -42,7 +42,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
metricName := ""
for _, selectorName := range val.Selector {
metricName += selectorName + Delimiter
metricName += selectorName + SelectorDelimiter
}
metricName += val.MetricName
@@ -54,7 +54,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
var selector []string
selector = append(selector, val.Cluster, val.Node, strconv.FormatInt(freq, 10))
if !testEq(oldSelector, selector) {
if !stringSlicesEqual(oldSelector, selector) {
// Get the Avro level for the metric
avroLevel = avroStore.root.findAvroLevelOrCreate(selector)
@@ -71,7 +71,7 @@ func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
}()
}
func testEq(a, b []string) bool {
func stringSlicesEqual(a, b []string) bool {
if len(a) != len(b) {
return false
}

View File

@@ -13,12 +13,11 @@ import (
var (
LineProtocolMessages = make(chan *AvroStruct)
Delimiter = "ZZZZZ"
// SelectorDelimiter separates hierarchical selector components in metric names for Avro encoding
SelectorDelimiter = "_SEL_"
)
// CheckpointBufferMinutes should always be in minutes.
// Its controls the amount of data to hold for given amount of time.
var CheckpointBufferMinutes = 3
var CheckpointBufferMinutes = DefaultCheckpointBufferMin
type AvroStruct struct {
MetricName string
@@ -73,7 +72,7 @@ func (l *AvroLevel) findAvroLevelOrCreate(selector []string) *AvroLevel {
}
}
// The level does not exist, take write lock for unqiue access:
// The level does not exist, take write lock for unique access:
l.lock.Lock()
// While this thread waited for the write lock, another thread
// could have created the child node.

View File

@@ -12,15 +12,12 @@ import (
"github.com/ClusterCockpit/cc-lib/schema"
)
// Default buffer capacity.
// `buffer.data` will only ever grow up to it's capacity and a new link
// BufferCap is the default buffer capacity.
// buffer.data will only ever grow up to its capacity and a new link
// in the buffer chain will be created if needed so that no copying
// of data or reallocation needs to happen on writes.
const (
BufferCap int = 512
)
const BufferCap int = DefaultBufferCapacity
// So that we can reuse allocations
var bufferPool sync.Pool = sync.Pool{
New: func() any {
return &buffer{
@@ -75,7 +72,6 @@ func (b *buffer) write(ts int64, value schema.Float) (*buffer, error) {
newbuf := newBuffer(ts, b.frequency)
newbuf.prev = b
b.next = newbuf
b.close()
b = newbuf
idx = 0
}
@@ -103,8 +99,6 @@ func (b *buffer) firstWrite() int64 {
return b.start + (b.frequency / 2)
}
func (b *buffer) close() {}
// Return all known values from `from` to `to`. Gaps of information are represented as NaN.
// Simple linear interpolation is done between the two neighboring cells if possible.
// If values at the start or end are missing, instead of NaN values, the second and thrid
@@ -139,8 +133,6 @@ func (b *buffer) read(from, to int64, data []schema.Float) ([]schema.Float, int6
data[i] += schema.NaN
} else if t < b.start {
data[i] += schema.NaN
// } else if b.data[idx].IsNaN() {
// data[i] += interpolate(idx, b.data)
} else {
data[i] += b.data[idx]
}

View File

@@ -28,15 +28,10 @@ import (
"github.com/linkedin/goavro/v2"
)
// File operation constants
const (
// CheckpointFilePerms defines default permissions for checkpoint files
CheckpointFilePerms = 0o644
// CheckpointDirPerms defines default permissions for checkpoint directories
CheckpointDirPerms = 0o755
// GCTriggerInterval determines how often GC is forced during checkpoint loading
// GC is triggered every GCTriggerInterval*NumWorkers loaded hosts
GCTriggerInterval = 100
GCTriggerInterval = DefaultGCTriggerInterval
)
// Whenever changed, update MarshalJSON as well!
@@ -71,17 +66,14 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
return
}
ticks := func() <-chan time.Time {
if d <= 0 {
return nil
}
return time.NewTicker(d).C
}()
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticks:
case <-ticker.C:
cclog.Infof("[METRICSTORE]> start checkpointing (starting at %s)...", lastCheckpoint.Format(time.RFC3339))
now := time.Now()
n, err := ms.ToCheckpoint(Keys.Checkpoints.RootDir,
@@ -98,33 +90,23 @@ func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
} else {
go func() {
defer wg.Done()
d, _ := time.ParseDuration("1m")
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(CheckpointBufferMinutes) * time.Minute):
// This is the first tick untill we collect the data for given minutes.
GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, false)
// log.Printf("Checkpointing %d avro files", count)
}
ticks := func() <-chan time.Time {
if d <= 0 {
return nil
}
return time.NewTicker(d).C
}()
ticker := time.NewTicker(DefaultAvroCheckpointInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticks:
// Regular ticks of 1 minute to write data.
case <-ticker.C:
GetAvroStore().ToCheckpoint(Keys.Checkpoints.RootDir, false)
// log.Printf("Checkpointing %d avro files", count)
}
}
}()
@@ -329,7 +311,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) (
lvl := m.root.findLevelOrCreate(host[:], len(m.Metrics))
nn, err := lvl.fromCheckpoint(m, filepath.Join(dir, host[0], host[1]), from, extension)
if err != nil {
cclog.Fatalf("[METRICSTORE]> error while loading checkpoints: %s", err.Error())
cclog.Errorf("[METRICSTORE]> error while loading checkpoints for %s/%s: %s", host[0], host[1], err.Error())
atomic.AddInt32(&errs, 1)
}
atomic.AddInt32(&n, int32(nn))
@@ -506,8 +488,8 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error {
for key, floatArray := range metricsData {
metricName := ReplaceKey(key)
if strings.Contains(metricName, Delimiter) {
subString := strings.Split(metricName, Delimiter)
if strings.Contains(metricName, SelectorDelimiter) {
subString := strings.Split(metricName, SelectorDelimiter)
lvl := l
@@ -557,12 +539,10 @@ func (l *Level) createBuffer(m *MemoryStore, metricName string, floatArray schem
next: nil,
archived: true,
}
b.close()
minfo, ok := m.Metrics[metricName]
if !ok {
return nil
// return errors.New("Unkown metric: " + name)
}
prev := l.metrics[minfo.offset]
@@ -616,17 +596,15 @@ func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
b := &buffer{
frequency: metric.Frequency,
start: metric.Start,
data: metric.Data[0:n:n], // Space is wasted here :(
data: metric.Data[0:n:n],
prev: nil,
next: nil,
archived: true,
}
b.close()
minfo, ok := m.Metrics[name]
if !ok {
continue
// return errors.New("Unkown metric: " + name)
}
prev := l.metrics[minfo.offset]

View File

@@ -7,6 +7,16 @@ package memorystore
import (
"fmt"
"time"
)
const (
DefaultMaxWorkers = 10
DefaultBufferCapacity = 512
DefaultGCTriggerInterval = 100
DefaultAvroWorkers = 4
DefaultCheckpointBufferMin = 3
DefaultAvroCheckpointInterval = time.Minute
)
var InternalCCMSFlag bool = false
@@ -31,20 +41,6 @@ type MetricStoreConfig struct {
RootDir string `json:"directory"`
DeleteInstead bool `json:"delete-instead"`
} `json:"archive"`
Nats []*NatsConfig `json:"nats"`
}
type NatsConfig struct {
// Address of the nats server
Address string `json:"address"`
// Username/Password, optional
Username string `json:"username"`
Password string `json:"password"`
// Creds file path
Credsfilepath string `json:"creds-file-path"`
Subscriptions []struct {
// Channel name
SubscribeTo string `json:"subscribe-to"`

View File

@@ -46,7 +46,7 @@ func (l *Level) findLevelOrCreate(selector []string, nMetrics int) *Level {
}
}
// The level does not exist, take write lock for unqiue access:
// The level does not exist, take write lock for unique access:
l.lock.Lock()
// While this thread waited for the write lock, another thread
// could have created the child node.

View File

@@ -11,113 +11,31 @@ import (
"sync"
"time"
"github.com/ClusterCockpit/cc-backend/pkg/nats"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema"
"github.com/influxdata/line-protocol/v2/lineprotocol"
"github.com/nats-io/nats.go"
)
// Each connection is handled in it's own goroutine. This is a blocking function.
// func ReceiveRaw(ctx context.Context,
// listener net.Listener,
// handleLine func(*lineprotocol.Decoder, string) error,
// ) error {
// var wg sync.WaitGroup
// wg.Add(1)
// go func() {
// defer wg.Done()
// <-ctx.Done()
// if err := listener.Close(); err != nil {
// log.Printf("listener.Close(): %s", err.Error())
// }
// }()
// for {
// conn, err := listener.Accept()
// if err != nil {
// if errors.Is(err, net.ErrClosed) {
// break
// }
// log.Printf("listener.Accept(): %s", err.Error())
// }
// wg.Add(2)
// go func() {
// defer wg.Done()
// defer conn.Close()
// dec := lineprotocol.NewDecoder(conn)
// connctx, cancel := context.WithCancel(context.Background())
// defer cancel()
// go func() {
// defer wg.Done()
// select {
// case <-connctx.Done():
// conn.Close()
// case <-ctx.Done():
// conn.Close()
// }
// }()
// if err := handleLine(dec, "default"); err != nil {
// if errors.Is(err, net.ErrClosed) {
// return
// }
// log.Printf("%s: %s", conn.RemoteAddr().String(), err.Error())
// errmsg := make([]byte, 128)
// errmsg = append(errmsg, `error: `...)
// errmsg = append(errmsg, err.Error()...)
// errmsg = append(errmsg, '\n')
// conn.Write(errmsg)
// }
// }()
// }
// wg.Wait()
// return nil
// }
// ReceiveNats connects to a nats server and subscribes to "updates". This is a
// blocking function. handleLine will be called for each line recieved via
// nats. Send `true` through the done channel for gracefull termination.
func ReceiveNats(conf *(NatsConfig),
ms *MemoryStore,
func ReceiveNats(ms *MemoryStore,
workers int,
ctx context.Context,
) error {
var opts []nats.Option
if conf.Username != "" && conf.Password != "" {
opts = append(opts, nats.UserInfo(conf.Username, conf.Password))
}
if conf.Credsfilepath != "" {
opts = append(opts, nats.UserCredentials(conf.Credsfilepath))
}
nc, err := nats.Connect(conf.Address, opts...)
if err != nil {
return err
}
defer nc.Close()
nc := nats.GetClient()
var wg sync.WaitGroup
var subs []*nats.Subscription
msgs := make(chan *nats.Msg, workers*2)
msgs := make(chan []byte, workers*2)
for _, sc := range conf.Subscriptions {
for _, sc := range Keys.Subscriptions {
clusterTag := sc.ClusterTag
var sub *nats.Subscription
if workers > 1 {
wg.Add(workers)
for range workers {
go func() {
for m := range msgs {
dec := lineprotocol.NewDecoderWithBytes(m.Data)
dec := lineprotocol.NewDecoderWithBytes(m)
if err := DecodeLine(dec, ms, clusterTag); err != nil {
cclog.Errorf("error: %s", err.Error())
}
@@ -127,37 +45,24 @@ func ReceiveNats(conf *(NatsConfig),
}()
}
sub, err = nc.Subscribe(sc.SubscribeTo, func(m *nats.Msg) {
msgs <- m
nc.Subscribe(sc.SubscribeTo, func(subject string, data []byte) {
msgs <- data
})
} else {
sub, err = nc.Subscribe(sc.SubscribeTo, func(m *nats.Msg) {
dec := lineprotocol.NewDecoderWithBytes(m.Data)
nc.Subscribe(sc.SubscribeTo, func(subject string, data []byte) {
dec := lineprotocol.NewDecoderWithBytes(data)
if err := DecodeLine(dec, ms, clusterTag); err != nil {
cclog.Errorf("error: %s", err.Error())
}
})
}
if err != nil {
return err
}
cclog.Infof("NATS subscription to '%s' on '%s' established", sc.SubscribeTo, conf.Address)
subs = append(subs, sub)
cclog.Infof("NATS subscription to '%s' established", sc.SubscribeTo)
}
<-ctx.Done()
for _, sub := range subs {
err = sub.Unsubscribe()
if err != nil {
cclog.Errorf("NATS unsubscribe failed: %s", err.Error())
}
}
close(msgs)
wg.Wait()
nc.Close()
cclog.Print("NATS connection closed")
return nil
}
@@ -266,8 +171,6 @@ func DecodeLine(dec *lineprotocol.Decoder,
case "stype-id":
subTypeBuf = append(subTypeBuf, val...)
default:
// Ignore unkown tags (cc-metric-collector might send us a unit for example that we do not need)
// return fmt.Errorf("unkown tag: '%s' (value: '%s')", string(key), string(val))
}
}

View File

@@ -44,8 +44,6 @@ var (
shutdownFunc context.CancelFunc
)
type Metric struct {
Name string
Value schema.Float
@@ -71,8 +69,7 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
// Set NumWorkers from config or use default
if Keys.NumWorkers <= 0 {
maxWorkers := 10
Keys.NumWorkers = min(runtime.NumCPU()/2+1, maxWorkers)
Keys.NumWorkers = min(runtime.NumCPU()/2+1, DefaultMaxWorkers)
}
cclog.Debugf("[METRICSTORE]> Using %d workers for checkpoint/archive operations\n", Keys.NumWorkers)
@@ -144,21 +141,10 @@ func Init(rawConfig json.RawMessage, wg *sync.WaitGroup) {
// Store the shutdown function for later use by Shutdown()
shutdownFunc = shutdown
if Keys.Nats != nil {
for _, natsConf := range Keys.Nats {
// TODO: When multiple nats configs share a URL, do a single connect.
wg.Add(1)
nc := natsConf
go func() {
// err := ReceiveNats(conf.Nats, decodeLine, runtime.NumCPU()-1, ctx)
err := ReceiveNats(nc, ms, 1, ctx)
err = ReceiveNats(ms, 1, ctx)
if err != nil {
cclog.Fatal(err)
}
wg.Done()
}()
}
}
}
// InitMetrics creates a new, initialized instance of a MemoryStore.
@@ -244,18 +230,18 @@ func Retention(wg *sync.WaitGroup, ctx context.Context) {
return
}
ticks := func() <-chan time.Time {
d := d / 2
if d <= 0 {
return nil
tickInterval := d / 2
if tickInterval <= 0 {
return
}
return time.NewTicker(d).C
}()
ticker := time.NewTicker(tickInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticks:
case <-ticker.C:
t := time.Now().Add(-d)
cclog.Infof("[METRICSTORE]> start freeing buffers (older than %s)...\n", t.Format(time.RFC3339))
freed, err := ms.Free(nil, t.Unix())
@@ -332,7 +318,7 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, reso
minfo, ok := m.Metrics[metric]
if !ok {
return nil, 0, 0, 0, errors.New("[METRICSTORE]> unkown metric: " + metric)
return nil, 0, 0, 0, errors.New("[METRICSTORE]> unknown metric: " + metric)
}
n, data := 0, make([]schema.Float, (to-from)/minfo.Frequency+1)

View File

@@ -77,7 +77,7 @@ func (m *MemoryStore) Stats(selector util.Selector, metric string, from, to int6
minfo, ok := m.Metrics[metric]
if !ok {
return nil, 0, 0, errors.New("unkown metric: " + metric)
return nil, 0, 0, errors.New("unknown metric: " + metric)
}
n, samples := 0, 0