diff --git a/tools/archive-manager/main.go b/tools/archive-manager/main.go index 30aa908..ae0ae9e 100644 --- a/tools/archive-manager/main.go +++ b/tools/archive-manager/main.go @@ -9,6 +9,7 @@ import ( "flag" "fmt" "os" + "strings" "sync" "sync/atomic" "time" @@ -33,78 +34,172 @@ func parseDate(in string) int64 { return 0 } +// countJobs counts the total number of jobs in the source archive. +func countJobs(srcBackend archive.ArchiveBackend) int { + count := 0 + for range srcBackend.Iter(false) { + count++ + } + return count +} + +// formatDuration formats a duration as a human-readable string. +func formatDuration(d time.Duration) string { + if d < time.Minute { + return fmt.Sprintf("%ds", int(d.Seconds())) + } else if d < time.Hour { + return fmt.Sprintf("%dm%ds", int(d.Minutes()), int(d.Seconds())%60) + } + return fmt.Sprintf("%dh%dm", int(d.Hours()), int(d.Minutes())%60) +} + +// progressMeter displays import progress to the terminal. +type progressMeter struct { + total int + processed int32 + imported int32 + skipped int32 + failed int32 + startTime time.Time + done chan struct{} +} + +func newProgressMeter(total int) *progressMeter { + return &progressMeter{ + total: total, + startTime: time.Now(), + done: make(chan struct{}), + } +} + +func (p *progressMeter) start() { + go func() { + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + p.render() + case <-p.done: + p.render() + fmt.Println() + return + } + } + }() +} + +func (p *progressMeter) render() { + processed := atomic.LoadInt32(&p.processed) + imported := atomic.LoadInt32(&p.imported) + skipped := atomic.LoadInt32(&p.skipped) + failed := atomic.LoadInt32(&p.failed) + + elapsed := time.Since(p.startTime) + percent := float64(processed) / float64(p.total) * 100 + if p.total == 0 { + percent = 0 + } + + var eta string + var throughput float64 + if processed > 0 { + throughput = float64(processed) / elapsed.Seconds() + remaining := float64(p.total-int(processed)) / throughput + eta = formatDuration(time.Duration(remaining) * time.Second) + } else { + eta = "calculating..." + } + + barWidth := 30 + filled := int(float64(barWidth) * float64(processed) / float64(p.total)) + if p.total == 0 { + filled = 0 + } + + var bar strings.Builder + for i := range barWidth { + if i < filled { + bar.WriteString("█") + } else { + bar.WriteString("░") + } + } + + fmt.Printf("\r[%s] %5.1f%% | %d/%d | %.1f jobs/s | ETA: %s | ✓%d ○%d ✗%d ", + bar.String(), percent, processed, p.total, throughput, eta, imported, skipped, failed) +} + +func (p *progressMeter) stop() { + close(p.done) +} + // importArchive imports all jobs from a source archive backend to a destination archive backend. // It uses parallel processing with a worker pool to improve performance. // Returns the number of successfully imported jobs, failed jobs, and any error encountered. func importArchive(srcBackend, dstBackend archive.ArchiveBackend) (int, int, error) { cclog.Info("Starting parallel archive import...") - // Use atomic counters for thread-safe updates - var imported int32 - var failed int32 - var skipped int32 + cclog.Info("Counting jobs in source archive (this may take a long time) ...") + totalJobs := countJobs(srcBackend) + cclog.Infof("Found %d jobs to process", totalJobs) + + progress := newProgressMeter(totalJobs) - // Number of parallel workers numWorkers := 4 cclog.Infof("Using %d parallel workers", numWorkers) - // Create channels for job distribution jobs := make(chan archive.JobContainer, numWorkers*2) - // WaitGroup to track worker completion var wg sync.WaitGroup - // Start worker goroutines + progress.start() + for i := range numWorkers { wg.Add(1) go func(workerID int) { defer wg.Done() for job := range jobs { - // Validate job metadata if job.Meta == nil { cclog.Warn("Skipping job with nil metadata") - atomic.AddInt32(&failed, 1) + atomic.AddInt32(&progress.failed, 1) + atomic.AddInt32(&progress.processed, 1) continue } - // Validate job data if job.Data == nil { cclog.Warnf("Job %d from cluster %s has no metric data, skipping", job.Meta.JobID, job.Meta.Cluster) - atomic.AddInt32(&failed, 1) + atomic.AddInt32(&progress.failed, 1) + atomic.AddInt32(&progress.processed, 1) continue } - // Check if job already exists in destination if dstBackend.Exists(job.Meta) { cclog.Debugf("Job %d (cluster: %s, start: %d) already exists in destination, skipping", job.Meta.JobID, job.Meta.Cluster, job.Meta.StartTime) - atomic.AddInt32(&skipped, 1) + atomic.AddInt32(&progress.skipped, 1) + atomic.AddInt32(&progress.processed, 1) continue } - // Import job to destination if err := dstBackend.ImportJob(job.Meta, job.Data); err != nil { cclog.Errorf("Failed to import job %d from cluster %s: %s", job.Meta.JobID, job.Meta.Cluster, err.Error()) - atomic.AddInt32(&failed, 1) + atomic.AddInt32(&progress.failed, 1) + atomic.AddInt32(&progress.processed, 1) continue } - // Successfully imported - newCount := atomic.AddInt32(&imported, 1) - if newCount%100 == 0 { - cclog.Infof("Progress: %d jobs imported, %d skipped, %d failed", - newCount, atomic.LoadInt32(&skipped), atomic.LoadInt32(&failed)) - } + atomic.AddInt32(&progress.imported, 1) + atomic.AddInt32(&progress.processed, 1) } }(i) } - // Feed jobs to workers go func() { - // Import cluster configs first clusters := srcBackend.GetClusters() for _, clusterName := range clusters { clusterCfg, err := srcBackend.LoadClusterCfg(clusterName) @@ -126,15 +221,16 @@ func importArchive(srcBackend, dstBackend archive.ArchiveBackend) (int, int, err close(jobs) }() - // Wait for all workers to complete wg.Wait() + progress.stop() - finalImported := int(atomic.LoadInt32(&imported)) - finalFailed := int(atomic.LoadInt32(&failed)) - finalSkipped := int(atomic.LoadInt32(&skipped)) + finalImported := int(atomic.LoadInt32(&progress.imported)) + finalFailed := int(atomic.LoadInt32(&progress.failed)) + finalSkipped := int(atomic.LoadInt32(&progress.skipped)) - cclog.Infof("Import completed: %d jobs imported, %d skipped, %d failed", - finalImported, finalSkipped, finalFailed) + elapsed := time.Since(progress.startTime) + cclog.Infof("Import completed in %s: %d jobs imported, %d skipped, %d failed", + formatDuration(elapsed), finalImported, finalSkipped, finalFailed) if finalFailed > 0 { return finalImported, finalFailed, fmt.Errorf("%d jobs failed to import", finalFailed) @@ -150,7 +246,7 @@ func main() { flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive") flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages") - flag.StringVar(&flagLogLevel, "loglevel", "warn", "Sets the logging level: `[debug,info,warn (default),err,fatal,crit]`") + flag.StringVar(&flagLogLevel, "loglevel", "info", "Sets the logging level: `[debug,info,warn (default),err,fatal,crit]`") flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`") flag.StringVar(&flagRemoveCluster, "remove-cluster", "", "Remove cluster from archive and database") flag.StringVar(&flagRemoveBefore, "remove-before", "", "Remove all jobs with start time before date (Format: 2006-Jan-04)")