From 61f90567814dbed2e2d5f6fd92270f87fc54d372 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Wed, 1 Dec 2021 12:30:01 +0100 Subject: [PATCH] Improve startup time by reading data in parallel --- archive.go | 70 +++++++++++++++++++++++++++++++++++++++++++++++-- metric-store.go | 2 +- 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/archive.go b/archive.go index bcd24cf..15ae399 100644 --- a/archive.go +++ b/archive.go @@ -11,9 +11,12 @@ import ( "os" "path" "path/filepath" + "runtime" "sort" "strconv" "strings" + "sync" + "sync/atomic" ) type CheckpointMetrics struct { @@ -163,9 +166,72 @@ func (l *level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { // Metrics stored at the lowest 2 levels are not loaded (root and cluster)! // This function can only be called once and before the very first write or read. -// Unlike ToCheckpoint, this function is NOT thread-safe. +// Different host's data is loaded to memory in parallel. func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { - return m.root.fromCheckpoint(dir, from, m) + workers := runtime.NumCPU() / 2 + work := make(chan [2]string, workers) + errs := make(chan error, 1) + var wg sync.WaitGroup + wg.Add(workers + 1) + + go func() { + defer close(work) + defer wg.Done() + + clustersDir, err := os.ReadDir(dir) + if err != nil { + errs <- err + return + } + + for _, clusterDir := range clustersDir { + if !clusterDir.IsDir() { + errs <- errors.New("expected only directories at first level of checkpoints/ directory") + return + } + + hostsDir, err := os.ReadDir(filepath.Join(dir, clusterDir.Name())) + if err != nil { + errs <- err + return + } + + for _, hostDir := range hostsDir { + if !hostDir.IsDir() { + errs <- errors.New("expected only directories at second level of checkpoints/ directory") + return + } + + work <- [2]string{clusterDir.Name(), hostDir.Name()} + } + } + }() + + loadedFiles := int32(0) + + for worker := 0; worker < workers; worker++ { + go func() { + defer wg.Done() + for host := range work { + lvl := m.root.findLevelOrCreate(host[:], len(m.metrics)) + n, err := lvl.fromCheckpoint(filepath.Join(dir, host[0], host[1]), from, m) + if err != nil { + errs <- err + return + } + atomic.AddInt32(&loadedFiles, int32(n)) + } + }() + } + + wg.Wait() + var err error = nil + select { + case e := <-errs: + err = e + default: + } + return int(loadedFiles), err } func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error { diff --git a/metric-store.go b/metric-store.go index 8b0cc8d..3f68aaa 100644 --- a/metric-store.go +++ b/metric-store.go @@ -220,7 +220,7 @@ func main() { if err != nil { log.Fatalf("Loading checkpoints failed: %s\n", err.Error()) } else { - log.Printf("Checkpoints loaded (%d files)\n", files) + log.Printf("Checkpoints loaded (%d files, that took %dms)\n", files, time.Since(startupTime).Milliseconds()) } ctx, shutdown := context.WithCancel(context.Background())