diff --git a/tools/archive-manager/main.go b/tools/archive-manager/main.go index ae0ae9e..940c92d 100644 --- a/tools/archive-manager/main.go +++ b/tools/archive-manager/main.go @@ -5,13 +5,18 @@ package main import ( + "context" "encoding/json" "flag" "fmt" "os" + "os/exec" + "os/signal" + "strconv" "strings" "sync" "sync/atomic" + "syscall" "time" "github.com/ClusterCockpit/cc-backend/internal/config" @@ -34,13 +39,56 @@ func parseDate(in string) int64 { return 0 } -// countJobs counts the total number of jobs in the source archive. -func countJobs(srcBackend archive.ArchiveBackend) int { - count := 0 - for range srcBackend.Iter(false) { - count++ +// countJobs counts the total number of jobs in the source archive using external fd command. +// It requires the fd binary to be available in PATH. +// The srcConfig parameter should be the JSON configuration string containing the archive path. +func countJobs(srcConfig string) (int, error) { + fdPath, err := exec.LookPath("fd") + if err != nil { + return 0, fmt.Errorf("fd binary not found in PATH: %w", err) } - return count + + var config struct { + Kind string `json:"kind"` + Path string `json:"path"` + } + if err := json.Unmarshal([]byte(srcConfig), &config); err != nil { + return 0, fmt.Errorf("failed to parse source config: %w", err) + } + + if config.Path == "" { + return 0, fmt.Errorf("no path found in source config") + } + + fdCmd := exec.Command(fdPath, "meta.json", config.Path) + wcCmd := exec.Command("wc", "-l") + + pipe, err := fdCmd.StdoutPipe() + if err != nil { + return 0, fmt.Errorf("failed to create pipe: %w", err) + } + wcCmd.Stdin = pipe + + if err := fdCmd.Start(); err != nil { + return 0, fmt.Errorf("failed to start fd command: %w", err) + } + + output, err := wcCmd.Output() + if err != nil { + return 0, fmt.Errorf("failed to run wc command: %w", err) + } + + if err := fdCmd.Wait(); err != nil { + return 0, fmt.Errorf("fd command failed: %w", err) + } + + countStr := strings.TrimSpace(string(output)) + count, err := strconv.Atoi(countStr) + if err != nil { + return 0, fmt.Errorf("failed to parse count from wc output '%s': %w", countStr, err) + } + + return count, nil } // formatDuration formats a duration as a human-readable string. @@ -137,12 +185,34 @@ func (p *progressMeter) stop() { // importArchive imports all jobs from a source archive backend to a destination archive backend. // It uses parallel processing with a worker pool to improve performance. +// The import can be interrupted by CTRL-C (SIGINT) and will terminate gracefully. // Returns the number of successfully imported jobs, failed jobs, and any error encountered. -func importArchive(srcBackend, dstBackend archive.ArchiveBackend) (int, int, error) { +func importArchive(srcBackend, dstBackend archive.ArchiveBackend, srcConfig string) (int, int, error) { cclog.Info("Starting parallel archive import...") + cclog.Info("Press CTRL-C to interrupt (will finish current jobs before exiting)") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + + var interrupted atomic.Bool + + go func() { + <-sigChan + cclog.Warn("Interrupt received, stopping import (finishing current jobs)...") + interrupted.Store(true) + cancel() + // Stop listening for further signals to allow force quit with second CTRL-C + signal.Stop(sigChan) + }() cclog.Info("Counting jobs in source archive (this may take a long time) ...") - totalJobs := countJobs(srcBackend) + totalJobs, err := countJobs(srcConfig) + if err != nil { + return 0, 0, fmt.Errorf("failed to count jobs: %w", err) + } cclog.Infof("Found %d jobs to process", totalJobs) progress := newProgressMeter(totalJobs) @@ -200,8 +270,14 @@ func importArchive(srcBackend, dstBackend archive.ArchiveBackend) (int, int, err } go func() { + defer close(jobs) + clusters := srcBackend.GetClusters() for _, clusterName := range clusters { + if ctx.Err() != nil { + return + } + clusterCfg, err := srcBackend.LoadClusterCfg(clusterName) if err != nil { cclog.Errorf("Failed to load cluster config for %s: %v", clusterName, err) @@ -216,9 +292,14 @@ func importArchive(srcBackend, dstBackend archive.ArchiveBackend) (int, int, err } for job := range srcBackend.Iter(true) { - jobs <- job + select { + case <-ctx.Done(): + // Drain remaining items from iterator to avoid resource leak + // but don't process them + return + case jobs <- job: + } } - close(jobs) }() wg.Wait() @@ -229,6 +310,13 @@ func importArchive(srcBackend, dstBackend archive.ArchiveBackend) (int, int, err finalSkipped := int(atomic.LoadInt32(&progress.skipped)) elapsed := time.Since(progress.startTime) + + if interrupted.Load() { + cclog.Warnf("Import interrupted after %s: %d jobs imported, %d skipped, %d failed", + formatDuration(elapsed), finalImported, finalSkipped, finalFailed) + return finalImported, finalFailed, fmt.Errorf("import interrupted by user") + } + cclog.Infof("Import completed in %s: %d jobs imported, %d skipped, %d failed", formatDuration(elapsed), finalImported, finalSkipped, finalFailed) @@ -284,7 +372,7 @@ func main() { cclog.Info("Destination backend initialized successfully") // Perform import - imported, failed, err := importArchive(srcBackend, dstBackend) + imported, failed, err := importArchive(srcBackend, dstBackend, flagSrcConfig) if err != nil { cclog.Errorf("Import completed with errors: %s", err.Error()) if failed > 0 {