// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. package main import ( "encoding/json" "flag" "fmt" "os" "strings" "sync" "sync/atomic" "time" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/pkg/archive" ccconf "github.com/ClusterCockpit/cc-lib/ccConfig" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" ) func parseDate(in string) int64 { const shortForm = "2006-Jan-02" loc, _ := time.LoadLocation("Local") if in != "" { t, err := time.ParseInLocation(shortForm, in, loc) if err != nil { cclog.Abortf("Archive Manager Main: Date parse failed with input: '%s'\nError: %s\n", in, err.Error()) } return t.Unix() } return 0 } // countJobs counts the total number of jobs in the source archive. func countJobs(srcBackend archive.ArchiveBackend) int { count := 0 for range srcBackend.Iter(false) { count++ } return count } // formatDuration formats a duration as a human-readable string. func formatDuration(d time.Duration) string { if d < time.Minute { return fmt.Sprintf("%ds", int(d.Seconds())) } else if d < time.Hour { return fmt.Sprintf("%dm%ds", int(d.Minutes()), int(d.Seconds())%60) } return fmt.Sprintf("%dh%dm", int(d.Hours()), int(d.Minutes())%60) } // progressMeter displays import progress to the terminal. type progressMeter struct { total int processed int32 imported int32 skipped int32 failed int32 startTime time.Time done chan struct{} } func newProgressMeter(total int) *progressMeter { return &progressMeter{ total: total, startTime: time.Now(), done: make(chan struct{}), } } func (p *progressMeter) start() { go func() { ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() for { select { case <-ticker.C: p.render() case <-p.done: p.render() fmt.Println() return } } }() } func (p *progressMeter) render() { processed := atomic.LoadInt32(&p.processed) imported := atomic.LoadInt32(&p.imported) skipped := atomic.LoadInt32(&p.skipped) failed := atomic.LoadInt32(&p.failed) elapsed := time.Since(p.startTime) percent := float64(processed) / float64(p.total) * 100 if p.total == 0 { percent = 0 } var eta string var throughput float64 if processed > 0 { throughput = float64(processed) / elapsed.Seconds() remaining := float64(p.total-int(processed)) / throughput eta = formatDuration(time.Duration(remaining) * time.Second) } else { eta = "calculating..." } barWidth := 30 filled := int(float64(barWidth) * float64(processed) / float64(p.total)) if p.total == 0 { filled = 0 } var bar strings.Builder for i := range barWidth { if i < filled { bar.WriteString("█") } else { bar.WriteString("░") } } fmt.Printf("\r[%s] %5.1f%% | %d/%d | %.1f jobs/s | ETA: %s | ✓%d ○%d ✗%d ", bar.String(), percent, processed, p.total, throughput, eta, imported, skipped, failed) } func (p *progressMeter) stop() { close(p.done) } // importArchive imports all jobs from a source archive backend to a destination archive backend. // It uses parallel processing with a worker pool to improve performance. // Returns the number of successfully imported jobs, failed jobs, and any error encountered. func importArchive(srcBackend, dstBackend archive.ArchiveBackend) (int, int, error) { cclog.Info("Starting parallel archive import...") cclog.Info("Counting jobs in source archive (this may take a long time) ...") totalJobs := countJobs(srcBackend) cclog.Infof("Found %d jobs to process", totalJobs) progress := newProgressMeter(totalJobs) numWorkers := 4 cclog.Infof("Using %d parallel workers", numWorkers) jobs := make(chan archive.JobContainer, numWorkers*2) var wg sync.WaitGroup progress.start() for i := range numWorkers { wg.Add(1) go func(workerID int) { defer wg.Done() for job := range jobs { if job.Meta == nil { cclog.Warn("Skipping job with nil metadata") atomic.AddInt32(&progress.failed, 1) atomic.AddInt32(&progress.processed, 1) continue } if job.Data == nil { cclog.Warnf("Job %d from cluster %s has no metric data, skipping", job.Meta.JobID, job.Meta.Cluster) atomic.AddInt32(&progress.failed, 1) atomic.AddInt32(&progress.processed, 1) continue } if dstBackend.Exists(job.Meta) { cclog.Debugf("Job %d (cluster: %s, start: %d) already exists in destination, skipping", job.Meta.JobID, job.Meta.Cluster, job.Meta.StartTime) atomic.AddInt32(&progress.skipped, 1) atomic.AddInt32(&progress.processed, 1) continue } if err := dstBackend.ImportJob(job.Meta, job.Data); err != nil { cclog.Errorf("Failed to import job %d from cluster %s: %s", job.Meta.JobID, job.Meta.Cluster, err.Error()) atomic.AddInt32(&progress.failed, 1) atomic.AddInt32(&progress.processed, 1) continue } atomic.AddInt32(&progress.imported, 1) atomic.AddInt32(&progress.processed, 1) } }(i) } go func() { clusters := srcBackend.GetClusters() for _, clusterName := range clusters { clusterCfg, err := srcBackend.LoadClusterCfg(clusterName) if err != nil { cclog.Errorf("Failed to load cluster config for %s: %v", clusterName, err) continue } if err := dstBackend.StoreClusterCfg(clusterName, clusterCfg); err != nil { cclog.Errorf("Failed to store cluster config for %s: %v", clusterName, err) } else { cclog.Infof("Imported cluster config for %s", clusterName) } } for job := range srcBackend.Iter(true) { jobs <- job } close(jobs) }() wg.Wait() progress.stop() finalImported := int(atomic.LoadInt32(&progress.imported)) finalFailed := int(atomic.LoadInt32(&progress.failed)) finalSkipped := int(atomic.LoadInt32(&progress.skipped)) elapsed := time.Since(progress.startTime) cclog.Infof("Import completed in %s: %d jobs imported, %d skipped, %d failed", formatDuration(elapsed), finalImported, finalSkipped, finalFailed) if finalFailed > 0 { return finalImported, finalFailed, fmt.Errorf("%d jobs failed to import", finalFailed) } return finalImported, finalFailed, nil } func main() { var srcPath, flagConfigFile, flagLogLevel, flagRemoveCluster, flagRemoveAfter, flagRemoveBefore string var flagSrcConfig, flagDstConfig string var flagLogDateTime, flagValidate, flagImport bool flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive") flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages") flag.StringVar(&flagLogLevel, "loglevel", "info", "Sets the logging level: `[debug,info,warn (default),err,fatal,crit]`") flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`") flag.StringVar(&flagRemoveCluster, "remove-cluster", "", "Remove cluster from archive and database") flag.StringVar(&flagRemoveBefore, "remove-before", "", "Remove all jobs with start time before date (Format: 2006-Jan-04)") flag.StringVar(&flagRemoveAfter, "remove-after", "", "Remove all jobs with start time after date (Format: 2006-Jan-04)") flag.BoolVar(&flagValidate, "validate", false, "Set this flag to validate a job archive against the json schema") flag.BoolVar(&flagImport, "import", false, "Import jobs from source archive to destination archive") flag.StringVar(&flagSrcConfig, "src-config", "", "Source archive backend configuration (JSON), e.g. '{\"kind\":\"file\",\"path\":\"./archive\"}'") flag.StringVar(&flagDstConfig, "dst-config", "", "Destination archive backend configuration (JSON), e.g. '{\"kind\":\"sqlite\",\"dbPath\":\"./archive.db\"}'") flag.Parse() archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", srcPath) cclog.Init(flagLogLevel, flagLogDateTime) // Handle import mode if flagImport { if flagSrcConfig == "" || flagDstConfig == "" { cclog.Fatal("Both --src-config and --dst-config must be specified for import mode") } cclog.Info("Import mode: initializing source and destination backends...") // Initialize source backend srcBackend, err := archive.InitBackend(json.RawMessage(flagSrcConfig)) if err != nil { cclog.Fatalf("Failed to initialize source backend: %s", err.Error()) } cclog.Info("Source backend initialized successfully") // Initialize destination backend dstBackend, err := archive.InitBackend(json.RawMessage(flagDstConfig)) if err != nil { cclog.Fatalf("Failed to initialize destination backend: %s", err.Error()) } cclog.Info("Destination backend initialized successfully") // Perform import imported, failed, err := importArchive(srcBackend, dstBackend) if err != nil { cclog.Errorf("Import completed with errors: %s", err.Error()) if failed > 0 { os.Exit(1) } } cclog.Infof("Import finished successfully: %d jobs imported", imported) os.Exit(0) } ccconf.Init(flagConfigFile) // Load and check main configuration if cfg := ccconf.GetPackageConfig("main"); cfg != nil { if clustercfg := ccconf.GetPackageConfig("clusters"); clustercfg != nil { config.Init(cfg, clustercfg) } else { cclog.Abort("Cluster configuration must be present") } } else { cclog.Abort("Main configuration must be present") } if err := archive.Init(json.RawMessage(archiveCfg), false); err != nil { cclog.Fatal(err) } ar := archive.GetHandle() if flagValidate { config.Keys.Validate = true for job := range ar.Iter(true) { cclog.Debugf("Validate %s - %d", job.Meta.Cluster, job.Meta.JobID) } os.Exit(0) } if flagRemoveBefore != "" || flagRemoveAfter != "" { ar.Clean(parseDate(flagRemoveBefore), parseDate(flagRemoveAfter)) os.Exit(0) } ar.Info() }