Make import function interuptible and replace countJobs with external call to fd

This commit is contained in:
2025-12-17 06:32:53 +01:00
parent d30c6ef3bf
commit 88dc5036b3

View File

@@ -5,13 +5,18 @@
package main package main
import ( import (
"context"
"encoding/json" "encoding/json"
"flag" "flag"
"fmt" "fmt"
"os" "os"
"os/exec"
"os/signal"
"strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"syscall"
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/config"
@@ -34,13 +39,56 @@ func parseDate(in string) int64 {
return 0 return 0
} }
// countJobs counts the total number of jobs in the source archive. // countJobs counts the total number of jobs in the source archive using external fd command.
func countJobs(srcBackend archive.ArchiveBackend) int { // It requires the fd binary to be available in PATH.
count := 0 // The srcConfig parameter should be the JSON configuration string containing the archive path.
for range srcBackend.Iter(false) { func countJobs(srcConfig string) (int, error) {
count++ fdPath, err := exec.LookPath("fd")
if err != nil {
return 0, fmt.Errorf("fd binary not found in PATH: %w", err)
} }
return count
var config struct {
Kind string `json:"kind"`
Path string `json:"path"`
}
if err := json.Unmarshal([]byte(srcConfig), &config); err != nil {
return 0, fmt.Errorf("failed to parse source config: %w", err)
}
if config.Path == "" {
return 0, fmt.Errorf("no path found in source config")
}
fdCmd := exec.Command(fdPath, "meta.json", config.Path)
wcCmd := exec.Command("wc", "-l")
pipe, err := fdCmd.StdoutPipe()
if err != nil {
return 0, fmt.Errorf("failed to create pipe: %w", err)
}
wcCmd.Stdin = pipe
if err := fdCmd.Start(); err != nil {
return 0, fmt.Errorf("failed to start fd command: %w", err)
}
output, err := wcCmd.Output()
if err != nil {
return 0, fmt.Errorf("failed to run wc command: %w", err)
}
if err := fdCmd.Wait(); err != nil {
return 0, fmt.Errorf("fd command failed: %w", err)
}
countStr := strings.TrimSpace(string(output))
count, err := strconv.Atoi(countStr)
if err != nil {
return 0, fmt.Errorf("failed to parse count from wc output '%s': %w", countStr, err)
}
return count, nil
} }
// formatDuration formats a duration as a human-readable string. // formatDuration formats a duration as a human-readable string.
@@ -137,12 +185,34 @@ func (p *progressMeter) stop() {
// importArchive imports all jobs from a source archive backend to a destination archive backend. // importArchive imports all jobs from a source archive backend to a destination archive backend.
// It uses parallel processing with a worker pool to improve performance. // It uses parallel processing with a worker pool to improve performance.
// The import can be interrupted by CTRL-C (SIGINT) and will terminate gracefully.
// Returns the number of successfully imported jobs, failed jobs, and any error encountered. // Returns the number of successfully imported jobs, failed jobs, and any error encountered.
func importArchive(srcBackend, dstBackend archive.ArchiveBackend) (int, int, error) { func importArchive(srcBackend, dstBackend archive.ArchiveBackend, srcConfig string) (int, int, error) {
cclog.Info("Starting parallel archive import...") cclog.Info("Starting parallel archive import...")
cclog.Info("Press CTRL-C to interrupt (will finish current jobs before exiting)")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
var interrupted atomic.Bool
go func() {
<-sigChan
cclog.Warn("Interrupt received, stopping import (finishing current jobs)...")
interrupted.Store(true)
cancel()
// Stop listening for further signals to allow force quit with second CTRL-C
signal.Stop(sigChan)
}()
cclog.Info("Counting jobs in source archive (this may take a long time) ...") cclog.Info("Counting jobs in source archive (this may take a long time) ...")
totalJobs := countJobs(srcBackend) totalJobs, err := countJobs(srcConfig)
if err != nil {
return 0, 0, fmt.Errorf("failed to count jobs: %w", err)
}
cclog.Infof("Found %d jobs to process", totalJobs) cclog.Infof("Found %d jobs to process", totalJobs)
progress := newProgressMeter(totalJobs) progress := newProgressMeter(totalJobs)
@@ -200,8 +270,14 @@ func importArchive(srcBackend, dstBackend archive.ArchiveBackend) (int, int, err
} }
go func() { go func() {
defer close(jobs)
clusters := srcBackend.GetClusters() clusters := srcBackend.GetClusters()
for _, clusterName := range clusters { for _, clusterName := range clusters {
if ctx.Err() != nil {
return
}
clusterCfg, err := srcBackend.LoadClusterCfg(clusterName) clusterCfg, err := srcBackend.LoadClusterCfg(clusterName)
if err != nil { if err != nil {
cclog.Errorf("Failed to load cluster config for %s: %v", clusterName, err) cclog.Errorf("Failed to load cluster config for %s: %v", clusterName, err)
@@ -216,9 +292,14 @@ func importArchive(srcBackend, dstBackend archive.ArchiveBackend) (int, int, err
} }
for job := range srcBackend.Iter(true) { for job := range srcBackend.Iter(true) {
jobs <- job select {
case <-ctx.Done():
// Drain remaining items from iterator to avoid resource leak
// but don't process them
return
case jobs <- job:
}
} }
close(jobs)
}() }()
wg.Wait() wg.Wait()
@@ -229,6 +310,13 @@ func importArchive(srcBackend, dstBackend archive.ArchiveBackend) (int, int, err
finalSkipped := int(atomic.LoadInt32(&progress.skipped)) finalSkipped := int(atomic.LoadInt32(&progress.skipped))
elapsed := time.Since(progress.startTime) elapsed := time.Since(progress.startTime)
if interrupted.Load() {
cclog.Warnf("Import interrupted after %s: %d jobs imported, %d skipped, %d failed",
formatDuration(elapsed), finalImported, finalSkipped, finalFailed)
return finalImported, finalFailed, fmt.Errorf("import interrupted by user")
}
cclog.Infof("Import completed in %s: %d jobs imported, %d skipped, %d failed", cclog.Infof("Import completed in %s: %d jobs imported, %d skipped, %d failed",
formatDuration(elapsed), finalImported, finalSkipped, finalFailed) formatDuration(elapsed), finalImported, finalSkipped, finalFailed)
@@ -284,7 +372,7 @@ func main() {
cclog.Info("Destination backend initialized successfully") cclog.Info("Destination backend initialized successfully")
// Perform import // Perform import
imported, failed, err := importArchive(srcBackend, dstBackend) imported, failed, err := importArchive(srcBackend, dstBackend, flagSrcConfig)
if err != nil { if err != nil {
cclog.Errorf("Import completed with errors: %s", err.Error()) cclog.Errorf("Import completed with errors: %s", err.Error())
if failed > 0 { if failed > 0 {