diff --git a/archive.go b/archive.go index 48d4004..4746ad2 100644 --- a/archive.go +++ b/archive.go @@ -35,6 +35,8 @@ type CheckpointFile struct { var ErrNoNewData error = errors.New("all data already archived") +var NumWorkers int = (runtime.NumCPU() / 4) + 1 + // Metrics stored at the lowest 2 levels are not stored away (root and cluster)! // On a per-host basis a new JSON file is created. I have no idea if this will scale. // The good thing: Only a host at a time is locked, so this function can run @@ -53,27 +55,52 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { } m.root.lock.RUnlock() - n, errs := 0, 0 + type workItem struct { + level *level + dir string + selector []string + } + + n, errs := int32(0), int32(0) + + var wg sync.WaitGroup + wg.Add(NumWorkers) + work := make(chan workItem, NumWorkers*2) + for worker := 0; worker < NumWorkers; worker++ { + go func() { + defer wg.Done() + + for workItem := range work { + if err := workItem.level.toCheckpoint(workItem.dir, from, to, m); err != nil { + if err == ErrNoNewData { + continue + } + + log.Printf("checkpointing %#v failed: %s", workItem.selector, err.Error()) + atomic.AddInt32(&errs, 1) + } else { + atomic.AddInt32(&n, 1) + } + } + }() + } + for i := 0; i < len(levels); i++ { dir := path.Join(dir, path.Join(selectors[i]...)) - err := levels[i].toCheckpoint(dir, from, to, m) - if err != nil { - if err == ErrNoNewData { - continue - } - - log.Printf("checkpointing %#v failed: %s", selectors[i], err.Error()) - errs += 1 - continue + work <- workItem{ + level: levels[i], + dir: dir, + selector: selectors[i], } - - n += 1 } + close(work) + wg.Wait() + if errs > 0 { - return n, fmt.Errorf("%d errors happend while creating checkpoints (%d successes)", errs, n) + return int(n), fmt.Errorf("%d errors happend while creating checkpoints (%d successes)", errs, n) } - return n, nil + return int(n), nil } func (l *level) toCheckpointFile(from, to int64, m *MemoryStore) (*CheckpointFile, error) { @@ -176,75 +203,61 @@ func (l *level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error { // This function can only be called once and before the very first write or read. // Different host's data is loaded to memory in parallel. func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { - workers := (runtime.NumCPU() / 2) + 1 - work := make(chan [2]string, workers*2) - errs := make(chan error, 1) var wg sync.WaitGroup - wg.Add(workers + 1) + work := make(chan [2]string, NumWorkers*2) + n, errs := int32(0), int32(0) - go func() { - defer close(work) - defer wg.Done() - - clustersDir, err := os.ReadDir(dir) - if err != nil { - errs <- err - return - } - - for _, clusterDir := range clustersDir { - if !clusterDir.IsDir() { - errs <- errors.New("expected only directories at first level of checkpoints/ directory") - return - } - - hostsDir, err := os.ReadDir(filepath.Join(dir, clusterDir.Name())) - if err != nil { - errs <- err - return - } - - for _, hostDir := range hostsDir { - if !hostDir.IsDir() { - errs <- errors.New("expected only directories at second level of checkpoints/ directory") - return - } - - work <- [2]string{clusterDir.Name(), hostDir.Name()} - } - } - }() - - loadedFiles := int32(0) - errors := int32(0) - - for worker := 0; worker < workers; worker++ { + wg.Add(NumWorkers) + for worker := 0; worker < NumWorkers; worker++ { go func() { defer wg.Done() for host := range work { lvl := m.root.findLevelOrCreate(host[:], len(m.metrics)) - n, err := lvl.fromCheckpoint(filepath.Join(dir, host[0], host[1]), from, m) + nn, err := lvl.fromCheckpoint(filepath.Join(dir, host[0], host[1]), from, m) if err != nil { log.Fatalf("error while loading checkpoints: %s", err.Error()) - atomic.AddInt32(&errors, int32(n)) + atomic.AddInt32(&errs, int32(nn)) continue } - atomic.AddInt32(&loadedFiles, int32(n)) + atomic.AddInt32(&n, int32(nn)) } }() } + clustersDir, err := os.ReadDir(dir) + for _, clusterDir := range clustersDir { + if !clusterDir.IsDir() { + err = errors.New("expected only directories at first level of checkpoints/ directory") + goto done + } + + hostsDir, e := os.ReadDir(filepath.Join(dir, clusterDir.Name())) + if e != nil { + err = e + goto done + } + + for _, hostDir := range hostsDir { + if !hostDir.IsDir() { + err = errors.New("expected only directories at second level of checkpoints/ directory") + goto done + } + + work <- [2]string{clusterDir.Name(), hostDir.Name()} + } + } +done: + close(work) wg.Wait() - select { - case err := <-errs: - return int(loadedFiles), err - default: + + if err != nil { + return int(n), err } - if errors > 0 { - return int(loadedFiles), fmt.Errorf("%d errors happend while creating checkpoints (%d successes)", errors, loadedFiles) + if errs > 0 { + return int(n), fmt.Errorf("%d errors happend while creating checkpoints (%d successes)", errs, n) } - return int(loadedFiles), nil + return int(n), nil } func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error {