Improve startup time by reading data in parallel

This commit is contained in:
Lou Knauer 2021-12-01 12:30:01 +01:00
parent 0219c48e78
commit 61f9056781
2 changed files with 69 additions and 3 deletions

View File

@ -11,9 +11,12 @@ import (
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"runtime"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"sync"
"sync/atomic"
) )
type CheckpointMetrics struct { type CheckpointMetrics struct {
@ -163,9 +166,72 @@ func (l *level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
// Metrics stored at the lowest 2 levels are not loaded (root and cluster)! // Metrics stored at the lowest 2 levels are not loaded (root and cluster)!
// This function can only be called once and before the very first write or read. // This function can only be called once and before the very first write or read.
// Unlike ToCheckpoint, this function is NOT thread-safe. // Different host's data is loaded to memory in parallel.
func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
return m.root.fromCheckpoint(dir, from, m) workers := runtime.NumCPU() / 2
work := make(chan [2]string, workers)
errs := make(chan error, 1)
var wg sync.WaitGroup
wg.Add(workers + 1)
go func() {
defer close(work)
defer wg.Done()
clustersDir, err := os.ReadDir(dir)
if err != nil {
errs <- err
return
}
for _, clusterDir := range clustersDir {
if !clusterDir.IsDir() {
errs <- errors.New("expected only directories at first level of checkpoints/ directory")
return
}
hostsDir, err := os.ReadDir(filepath.Join(dir, clusterDir.Name()))
if err != nil {
errs <- err
return
}
for _, hostDir := range hostsDir {
if !hostDir.IsDir() {
errs <- errors.New("expected only directories at second level of checkpoints/ directory")
return
}
work <- [2]string{clusterDir.Name(), hostDir.Name()}
}
}
}()
loadedFiles := int32(0)
for worker := 0; worker < workers; worker++ {
go func() {
defer wg.Done()
for host := range work {
lvl := m.root.findLevelOrCreate(host[:], len(m.metrics))
n, err := lvl.fromCheckpoint(filepath.Join(dir, host[0], host[1]), from, m)
if err != nil {
errs <- err
return
}
atomic.AddInt32(&loadedFiles, int32(n))
}
}()
}
wg.Wait()
var err error = nil
select {
case e := <-errs:
err = e
default:
}
return int(loadedFiles), err
} }
func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error { func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error {

View File

@ -220,7 +220,7 @@ func main() {
if err != nil { if err != nil {
log.Fatalf("Loading checkpoints failed: %s\n", err.Error()) log.Fatalf("Loading checkpoints failed: %s\n", err.Error())
} else { } else {
log.Printf("Checkpoints loaded (%d files)\n", files) log.Printf("Checkpoints loaded (%d files, that took %dms)\n", files, time.Since(startupTime).Milliseconds())
} }
ctx, shutdown := context.WithCancel(context.Background()) ctx, shutdown := context.WithCancel(context.Background())