Refactor memorystore

Fix issues
Add unit test
Add documentation
This commit is contained in:
2025-11-19 16:58:02 +01:00
parent 340efd7926
commit dd63e7157a
7 changed files with 255 additions and 170 deletions

View File

@@ -28,6 +28,17 @@ import (
"github.com/linkedin/goavro/v2"
)
// File operation constants
const (
// CheckpointFilePerms defines default permissions for checkpoint files
CheckpointFilePerms = 0o644
// CheckpointDirPerms defines default permissions for checkpoint directories
CheckpointDirPerms = 0o755
// GCTriggerInterval determines how often GC is forced during checkpoint loading
// GC is triggered every GCTriggerInterval*NumWorkers loaded hosts
GCTriggerInterval = 100
)
// Whenever changed, update MarshalJSON as well!
type CheckpointMetrics struct {
Data []schema.Float `json:"data"`
@@ -171,9 +182,9 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) {
n, errs := int32(0), int32(0)
var wg sync.WaitGroup
wg.Add(NumWorkers)
work := make(chan workItem, NumWorkers*2)
for worker := 0; worker < NumWorkers; worker++ {
wg.Add(Keys.NumWorkers)
work := make(chan workItem, Keys.NumWorkers*2)
for worker := 0; worker < Keys.NumWorkers; worker++ {
go func() {
defer wg.Done()
@@ -205,7 +216,7 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) {
wg.Wait()
if errs > 0 {
return int(n), fmt.Errorf("[METRICSTORE]> %d errors happend while creating checkpoints (%d successes)", errs, n)
return int(n), fmt.Errorf("[METRICSTORE]> %d errors happened while creating checkpoints (%d successes)", errs, n)
}
return int(n), nil
}
@@ -285,11 +296,11 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
}
filepath := path.Join(dir, fmt.Sprintf("%d.json", from))
f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0o644)
f, err := os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms)
if err != nil && os.IsNotExist(err) {
err = os.MkdirAll(dir, 0o755)
err = os.MkdirAll(dir, CheckpointDirPerms)
if err == nil {
f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, 0o644)
f, err = os.OpenFile(filepath, os.O_CREATE|os.O_WRONLY, CheckpointFilePerms)
}
}
if err != nil {
@@ -307,11 +318,11 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) (int, error) {
var wg sync.WaitGroup
work := make(chan [2]string, NumWorkers)
work := make(chan [2]string, Keys.NumWorkers)
n, errs := int32(0), int32(0)
wg.Add(NumWorkers)
for worker := 0; worker < NumWorkers; worker++ {
wg.Add(Keys.NumWorkers)
for worker := 0; worker < Keys.NumWorkers; worker++ {
go func() {
defer wg.Done()
for host := range work {
@@ -347,7 +358,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) (
}
i++
if i%NumWorkers == 0 && i > 100 {
if i%Keys.NumWorkers == 0 && i > GCTriggerInterval {
// Forcing garbage collection runs here regulary during the loading of checkpoints
// will decrease the total heap size after loading everything back to memory is done.
// While loading data, the heap will grow fast, so the GC target size will double
@@ -368,7 +379,7 @@ done:
}
if errs > 0 {
return int(n), fmt.Errorf("[METRICSTORE]> %d errors happend while creating checkpoints (%d successes)", errs, n)
return int(n), fmt.Errorf("[METRICSTORE]> %d errors happened while creating checkpoints (%d successes)", errs, n)
}
return int(n), nil
}
@@ -379,7 +390,7 @@ done:
func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) {
if _, err := os.Stat(dir); os.IsNotExist(err) {
// The directory does not exist, so create it using os.MkdirAll()
err := os.MkdirAll(dir, 0o755) // 0755 sets the permissions for the directory
err := os.MkdirAll(dir, CheckpointDirPerms) // CheckpointDirPerms sets the permissions for the directory
if err != nil {
cclog.Fatalf("[METRICSTORE]> Error creating directory: %#v\n", err)
}
@@ -464,7 +475,7 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error {
// Create a new OCF reader from the buffered reader
ocfReader, err := goavro.NewOCFReader(br)
if err != nil {
panic(err)
return fmt.Errorf("[METRICSTORE]> error creating OCF reader: %w", err)
}
metricsData := make(map[string]schema.FloatArray)
@@ -477,7 +488,7 @@ func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error {
record, ok := datum.(map[string]any)
if !ok {
panic("[METRICSTORE]> failed to assert datum as map[string]interface{}")
return fmt.Errorf("[METRICSTORE]> failed to assert datum as map[string]interface{}")
}
for key, value := range record {
@@ -559,7 +570,7 @@ func (l *Level) createBuffer(m *MemoryStore, metricName string, floatArray schem
l.metrics[minfo.offset] = b
} else {
if prev.start > b.start {
return errors.New("wooops")
return fmt.Errorf("[METRICSTORE]> buffer start time %d is before previous buffer start %d", b.start, prev.start)
}
b.prev = prev
@@ -623,7 +634,7 @@ func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
l.metrics[minfo.offset] = b
} else {
if prev.start > b.start {
return errors.New("wooops")
return fmt.Errorf("[METRICSTORE]> buffer start time %d is before previous buffer start %d", b.start, prev.start)
}
b.prev = prev
@@ -700,13 +711,17 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64, extension
loader := loaders[extension]
for _, filename := range files {
f, err := os.Open(path.Join(dir, filename))
if err != nil {
return filesLoaded, err
}
defer f.Close()
// Use a closure to ensure file is closed immediately after use
err := func() error {
f, err := os.Open(path.Join(dir, filename))
if err != nil {
return err
}
defer f.Close()
if err = loader(m, f, from); err != nil {
return loader(m, f, from)
}()
if err != nil {
return filesLoaded, err
}