Continue even if checkpointing some singel file fails

This commit is contained in:
Lou Knauer 2022-02-17 10:11:27 +01:00
parent 7201251600
commit 6ab7b6879c

View File

@ -53,7 +53,7 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) {
} }
m.root.lock.RUnlock() m.root.lock.RUnlock()
n := 0 n, errs := 0, 0
for i := 0; i < len(levels); i++ { for i := 0; i < len(levels); i++ {
dir := path.Join(dir, path.Join(selectors[i]...)) dir := path.Join(dir, path.Join(selectors[i]...))
err := levels[i].toCheckpoint(dir, from, to, m) err := levels[i].toCheckpoint(dir, from, to, m)
@ -62,12 +62,17 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) {
continue continue
} }
return i, err log.Printf("checkpointing %#v failed: %s", selectors[i], err.Error())
errs += 1
continue
} }
n += 1 n += 1
} }
if errs > 0 {
return n, fmt.Errorf("%d errors happend while creating checkpoints (%d successes)", errs, n)
}
return n, nil return n, nil
} }
@ -211,6 +216,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
}() }()
loadedFiles := int32(0) loadedFiles := int32(0)
errors := int32(0)
for worker := 0; worker < workers; worker++ { for worker := 0; worker < workers; worker++ {
go func() { go func() {
@ -220,6 +226,8 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
n, err := lvl.fromCheckpoint(filepath.Join(dir, host[0], host[1]), from, m) n, err := lvl.fromCheckpoint(filepath.Join(dir, host[0], host[1]), from, m)
if err != nil { if err != nil {
log.Fatalf("error while loading checkpoints: %s", err.Error()) log.Fatalf("error while loading checkpoints: %s", err.Error())
atomic.AddInt32(&errors, int32(n))
continue
} }
atomic.AddInt32(&loadedFiles, int32(n)) atomic.AddInt32(&loadedFiles, int32(n))
} }
@ -227,13 +235,16 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
} }
wg.Wait() wg.Wait()
var err error = nil
select { select {
case e := <-errs: case err := <-errs:
err = e return int(loadedFiles), err
default: default:
} }
return int(loadedFiles), err
if errors > 0 {
return int(loadedFiles), fmt.Errorf("%d errors happend while creating checkpoints (%d successes)", errors, loadedFiles)
}
return int(loadedFiles), nil
} }
func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error { func (l *level) loadFile(cf *CheckpointFile, m *MemoryStore) error {