diff --git a/archive.go b/archive.go index 4746ad2..aef367a 100644 --- a/archive.go +++ b/archive.go @@ -20,12 +20,34 @@ import ( "sync/atomic" ) +// Whenever changed, update MarshalJSON as well! type CheckpointMetrics struct { Frequency int64 `json:"frequency"` Start int64 `json:"start"` Data []Float `json:"data"` } +func (cm *CheckpointMetrics) MarshalJSON() ([]byte, error) { + buf := make([]byte, 0, 128+len(cm.Data)*8) + buf = append(buf, `{"frequency":`...) + buf = strconv.AppendInt(buf, cm.Frequency, 10) + buf = append(buf, `,"start":`...) + buf = strconv.AppendInt(buf, cm.Start, 10) + buf = append(buf, `,"data":[`...) + for i, x := range cm.Data { + if i != 0 { + buf = append(buf, ',') + } + if x.IsNaN() { + buf = append(buf, `null`...) + } else { + buf = strconv.AppendFloat(buf, float64(x), 'f', -1, 32) + } + } + buf = append(buf, `]}`...) + return buf, nil +} + type CheckpointFile struct { From int64 `json:"from"` To int64 `json:"to"` @@ -76,7 +98,7 @@ func (m *MemoryStore) ToCheckpoint(dir string, from, to int64) (int, error) { continue } - log.Printf("checkpointing %#v failed: %s", workItem.selector, err.Error()) + log.Printf("error while checkpointing %#v: %s", workItem.selector, err.Error()) atomic.AddInt32(&errs, 1) } else { atomic.AddInt32(&n, 1) @@ -216,8 +238,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { nn, err := lvl.fromCheckpoint(filepath.Join(dir, host[0], host[1]), from, m) if err != nil { log.Fatalf("error while loading checkpoints: %s", err.Error()) - atomic.AddInt32(&errs, int32(nn)) - continue + atomic.AddInt32(&errs, 1) } atomic.AddInt32(&n, int32(nn)) } @@ -436,26 +457,57 @@ func ArchiveCheckpoints(checkpointsDir, archiveDir string, from int64) (int, err return 0, err } - n := 0 + type workItem struct { + cdir, adir string + cluster, host string + } + + var wg sync.WaitGroup + n, errs := int32(0), int32(0) + work := make(chan workItem, NumWorkers*2) + + wg.Add(NumWorkers) + for worker := 0; worker < NumWorkers; worker++ { + go func() { + defer wg.Done() + for workItem := range work { + m, err := archiveCheckpoints(workItem.cdir, workItem.adir, from) + if err != nil { + log.Printf("error while archiving %s/%s: %s", workItem.cluster, workItem.host, err.Error()) + atomic.AddInt32(&errs, 1) + } + atomic.AddInt32(&n, int32(m)) + } + }() + } + for _, de1 := range entries1 { - entries2, err := os.ReadDir(filepath.Join(checkpointsDir, de1.Name())) - if err != nil { - return n, err + entries2, e := os.ReadDir(filepath.Join(checkpointsDir, de1.Name())) + if e != nil { + err = e } for _, de2 := range entries2 { cdir := filepath.Join(checkpointsDir, de1.Name(), de2.Name()) adir := filepath.Join(archiveDir, de1.Name(), de2.Name()) - m, err := archiveCheckpoints(cdir, adir, from) - n += m - if err != nil { - return n, err + work <- workItem{ + adir: adir, cdir: cdir, + cluster: de1.Name(), host: de2.Name(), } - } } - return n, nil + close(work) + wg.Wait() + + if err != nil { + return int(n), err + } + + if errs > 0 { + return int(n), fmt.Errorf("%d errors happend while archiving (%d successes)", errs, n) + } + return int(n), nil } // Helper function for `ArchiveCheckpoints`.