Trial and Test MetricStore components

This commit is contained in:
Aditya Ujeniya
2025-09-08 22:54:13 +02:00
parent 62565b9ae2
commit af43901ca3
34 changed files with 394 additions and 219 deletions

View File

@@ -380,7 +380,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) {
if err != nil {
log.Fatalf("[METRICSTORE]> Error creating directory: %#v\n", err)
}
fmt.Printf("[METRICSTORE]> %#v Directory created successfully.\n", dir)
log.Printf("[METRICSTORE]> %#v Directory created successfully.\n", dir)
}
// Config read (replace with your actual config read)

View File

@@ -2,10 +2,8 @@ package memorystore
import (
"context"
"errors"
"fmt"
"log"
"net"
"sync"
"time"
@@ -17,67 +15,67 @@ import (
)
// Each connection is handled in it's own goroutine. This is a blocking function.
func ReceiveRaw(ctx context.Context,
listener net.Listener,
handleLine func(*lineprotocol.Decoder, string) error,
) error {
var wg sync.WaitGroup
// func ReceiveRaw(ctx context.Context,
// listener net.Listener,
// handleLine func(*lineprotocol.Decoder, string) error,
// ) error {
// var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
<-ctx.Done()
if err := listener.Close(); err != nil {
log.Printf("listener.Close(): %s", err.Error())
}
}()
// wg.Add(1)
// go func() {
// defer wg.Done()
// <-ctx.Done()
// if err := listener.Close(); err != nil {
// log.Printf("listener.Close(): %s", err.Error())
// }
// }()
for {
conn, err := listener.Accept()
if err != nil {
if errors.Is(err, net.ErrClosed) {
break
}
// for {
// conn, err := listener.Accept()
// if err != nil {
// if errors.Is(err, net.ErrClosed) {
// break
// }
log.Printf("listener.Accept(): %s", err.Error())
}
// log.Printf("listener.Accept(): %s", err.Error())
// }
wg.Add(2)
go func() {
defer wg.Done()
defer conn.Close()
// wg.Add(2)
// go func() {
// defer wg.Done()
// defer conn.Close()
dec := lineprotocol.NewDecoder(conn)
connctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer wg.Done()
select {
case <-connctx.Done():
conn.Close()
case <-ctx.Done():
conn.Close()
}
}()
// dec := lineprotocol.NewDecoder(conn)
// connctx, cancel := context.WithCancel(context.Background())
// defer cancel()
// go func() {
// defer wg.Done()
// select {
// case <-connctx.Done():
// conn.Close()
// case <-ctx.Done():
// conn.Close()
// }
// }()
if err := handleLine(dec, "default"); err != nil {
if errors.Is(err, net.ErrClosed) {
return
}
// if err := handleLine(dec, "default"); err != nil {
// if errors.Is(err, net.ErrClosed) {
// return
// }
log.Printf("%s: %s", conn.RemoteAddr().String(), err.Error())
errmsg := make([]byte, 128)
errmsg = append(errmsg, `error: `...)
errmsg = append(errmsg, err.Error()...)
errmsg = append(errmsg, '\n')
conn.Write(errmsg)
}
}()
}
// log.Printf("%s: %s", conn.RemoteAddr().String(), err.Error())
// errmsg := make([]byte, 128)
// errmsg = append(errmsg, `error: `...)
// errmsg = append(errmsg, err.Error()...)
// errmsg = append(errmsg, '\n')
// conn.Write(errmsg)
// }
// }()
// }
wg.Wait()
return nil
}
// wg.Wait()
// return nil
// }
// Connect to a nats server and subscribe to "updates". This is a blocking
// function. handleLine will be called for each line recieved via nats.
@@ -113,7 +111,7 @@ func ReceiveNats(conf *(config.NatsConfig),
if workers > 1 {
wg.Add(workers)
for i := 0; i < workers; i++ {
for range workers {
go func() {
for m := range msgs {
dec := lineprotocol.NewDecoderWithBytes(m.Data)

View File

@@ -47,7 +47,7 @@ type MemoryStore struct {
root Level
}
func Init(wg sync.WaitGroup) {
func Init(wg *sync.WaitGroup) {
startupTime := time.Now()
//Pass the config.MetricStoreKeys
@@ -82,10 +82,10 @@ func Init(wg sync.WaitGroup) {
wg.Add(4)
Retention(&wg, ctx)
Checkpointing(&wg, ctx)
Archiving(&wg, ctx)
avro.DataStaging(&wg, ctx)
Retention(wg, ctx)
Checkpointing(wg, ctx)
Archiving(wg, ctx)
avro.DataStaging(wg, ctx)
wg.Add(1)
sigs := make(chan os.Signal, 1)
@@ -337,12 +337,12 @@ func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metric
// the range asked for if no data was available.
func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, resolution int64) ([]schema.Float, int64, int64, int64, error) {
if from > to {
return nil, 0, 0, 0, errors.New("[METRICSTORE]> invalid time range")
return nil, 0, 0, 0, errors.New("[METRICSTORE]> invalid time range\n")
}
minfo, ok := m.Metrics[metric]
if !ok {
return nil, 0, 0, 0, errors.New("[METRICSTORE]> unkown metric: " + metric)
return nil, 0, 0, 0, errors.New("[METRICSTORE]> unkown metric: \n" + metric)
}
n, data := 0, make([]schema.Float, (to-from)/minfo.Frequency+1)
@@ -381,7 +381,7 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, reso
if err != nil {
return nil, 0, 0, 0, err
} else if n == 0 {
return nil, 0, 0, 0, errors.New("[METRICSTORE]> metric or host not found")
return nil, 0, 0, 0, errors.New("[METRICSTORE]> metric or host not found\n")
} else if n > 1 {
if minfo.Aggregation == config.AvgAggregation {
normalize := 1. / schema.Float(n)