diff --git a/tools/archive-manager/archive-manager b/tools/archive-manager/archive-manager new file mode 100755 index 0000000..4f72c97 Binary files /dev/null and b/tools/archive-manager/archive-manager differ diff --git a/tools/archive-manager/import_test.go b/tools/archive-manager/import_test.go new file mode 100644 index 0000000..de0a04d --- /dev/null +++ b/tools/archive-manager/import_test.go @@ -0,0 +1,335 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package main + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-lib/schema" + "github.com/ClusterCockpit/cc-lib/util" +) + +// TestImportFileToSqlite tests importing jobs from file backend to SQLite backend +func TestImportFileToSqlite(t *testing.T) { + // Create temporary directories + tmpdir := t.TempDir() + srcArchive := filepath.Join(tmpdir, "src-archive") + dstDb := filepath.Join(tmpdir, "dst-archive.db") + + // Copy test data to source archive + testDataPath := "../../pkg/archive/testdata/archive" + if _, err := os.Stat(testDataPath); os.IsNotExist(err) { + t.Skip("Test data not found, skipping integration test") + } + + if err := util.CopyDir(testDataPath, srcArchive); err != nil { + t.Fatalf("Failed to copy test data: %s", err.Error()) + } + + // Initialize source backend (file) + srcConfig := fmt.Sprintf(`{"kind":"file","path":"%s"}`, srcArchive) + srcBackend, err := archive.InitBackend(json.RawMessage(srcConfig)) + if err != nil { + t.Fatalf("Failed to initialize source backend: %s", err.Error()) + } + + // Initialize destination backend (sqlite) + dstConfig := fmt.Sprintf(`{"kind":"sqlite","dbPath":"%s"}`, dstDb) + dstBackend, err := archive.InitBackend(json.RawMessage(dstConfig)) + if err != nil { + t.Fatalf("Failed to initialize destination backend: %s", err.Error()) + } + + // Perform import + imported, failed, err := importArchive(srcBackend, dstBackend) + if err != nil { + t.Errorf("Import failed: %s", err.Error()) + } + + if imported == 0 { + t.Error("No jobs were imported") + } + + if failed > 0 { + t.Errorf("%d jobs failed to import", failed) + } + + t.Logf("Successfully imported %d jobs", imported) + + // Verify jobs exist in destination + // Count jobs in source + srcCount := 0 + for range srcBackend.Iter(false) { + srcCount++ + } + + // Count jobs in destination + dstCount := 0 + for range dstBackend.Iter(false) { + dstCount++ + } + + if srcCount != dstCount { + t.Errorf("Job count mismatch: source has %d jobs, destination has %d jobs", srcCount, dstCount) + } +} + +// TestImportFileToFile tests importing jobs from one file backend to another +func TestImportFileToFile(t *testing.T) { + // Create temporary directories + tmpdir := t.TempDir() + srcArchive := filepath.Join(tmpdir, "src-archive") + dstArchive := filepath.Join(tmpdir, "dst-archive") + + // Copy test data to source archive + testDataPath := "../../pkg/archive/testdata/archive" + if _, err := os.Stat(testDataPath); os.IsNotExist(err) { + t.Skip("Test data not found, skipping integration test") + } + + if err := util.CopyDir(testDataPath, srcArchive); err != nil { + t.Fatalf("Failed to copy test data: %s", err.Error()) + } + + // Create destination archive directory + if err := os.MkdirAll(dstArchive, 0755); err != nil { + t.Fatalf("Failed to create destination directory: %s", err.Error()) + } + + // Initialize source backend + srcConfig := fmt.Sprintf(`{"kind":"file","path":"%s"}`, srcArchive) + srcBackend, err := archive.InitBackend(json.RawMessage(srcConfig)) + if err != nil { + t.Fatalf("Failed to initialize source backend: %s", err.Error()) + } + + // Initialize destination backend + dstConfig := fmt.Sprintf(`{"kind":"file","path":"%s"}`, dstArchive) + dstBackend, err := archive.InitBackend(json.RawMessage(dstConfig)) + if err != nil { + t.Fatalf("Failed to initialize destination backend: %s", err.Error()) + } + + // Perform import + imported, failed, err := importArchive(srcBackend, dstBackend) + if err != nil { + t.Errorf("Import failed: %s", err.Error()) + } + + if imported == 0 { + t.Error("No jobs were imported") + } + + if failed > 0 { + t.Errorf("%d jobs failed to import", failed) + } + + t.Logf("Successfully imported %d jobs", imported) +} + +// TestImportDataIntegrity verifies that job metadata and data are correctly imported +func TestImportDataIntegrity(t *testing.T) { + // Create temporary directories + tmpdir := t.TempDir() + srcArchive := filepath.Join(tmpdir, "src-archive") + dstDb := filepath.Join(tmpdir, "dst-archive.db") + + // Copy test data to source archive + testDataPath := "../../pkg/archive/testdata/archive" + if _, err := os.Stat(testDataPath); os.IsNotExist(err) { + t.Skip("Test data not found, skipping integration test") + } + + if err := util.CopyDir(testDataPath, srcArchive); err != nil { + t.Fatalf("Failed to copy test data: %s", err.Error()) + } + + // Initialize backends + srcConfig := fmt.Sprintf(`{"kind":"file","path":"%s"}`, srcArchive) + srcBackend, err := archive.InitBackend(json.RawMessage(srcConfig)) + if err != nil { + t.Fatalf("Failed to initialize source backend: %s", err.Error()) + } + + dstConfig := fmt.Sprintf(`{"kind":"sqlite","dbPath":"%s"}`, dstDb) + dstBackend, err := archive.InitBackend(json.RawMessage(dstConfig)) + if err != nil { + t.Fatalf("Failed to initialize destination backend: %s", err.Error()) + } + + // Perform import + _, _, err = importArchive(srcBackend, dstBackend) + if err != nil { + t.Errorf("Import failed: %s", err.Error()) + } + + // Verify data integrity for each job + verifiedJobs := 0 + for srcJob := range srcBackend.Iter(false) { + if srcJob.Meta == nil { + continue + } + + // Load job from destination + dstJobMeta, err := dstBackend.LoadJobMeta(srcJob.Meta) + if err != nil { + t.Errorf("Failed to load job %d from destination: %s", srcJob.Meta.JobID, err.Error()) + continue + } + + // Verify basic metadata + if dstJobMeta.JobID != srcJob.Meta.JobID { + t.Errorf("JobID mismatch: expected %d, got %d", srcJob.Meta.JobID, dstJobMeta.JobID) + } + + if dstJobMeta.Cluster != srcJob.Meta.Cluster { + t.Errorf("Cluster mismatch for job %d: expected %s, got %s", + srcJob.Meta.JobID, srcJob.Meta.Cluster, dstJobMeta.Cluster) + } + + if dstJobMeta.StartTime != srcJob.Meta.StartTime { + t.Errorf("StartTime mismatch for job %d: expected %d, got %d", + srcJob.Meta.JobID, srcJob.Meta.StartTime, dstJobMeta.StartTime) + } + + // Load and verify job data + srcData, err := srcBackend.LoadJobData(srcJob.Meta) + if err != nil { + t.Errorf("Failed to load job data from source: %s", err.Error()) + continue + } + + dstData, err := dstBackend.LoadJobData(srcJob.Meta) + if err != nil { + t.Errorf("Failed to load job data from destination: %s", err.Error()) + continue + } + + // Verify metric data exists + if len(srcData) != len(dstData) { + t.Errorf("Metric count mismatch for job %d: expected %d, got %d", + srcJob.Meta.JobID, len(srcData), len(dstData)) + } + + verifiedJobs++ + } + + if verifiedJobs == 0 { + t.Error("No jobs were verified") + } + + t.Logf("Successfully verified %d jobs", verifiedJobs) +} + +// TestImportEmptyArchive tests importing from an empty archive +func TestImportEmptyArchive(t *testing.T) { + tmpdir := t.TempDir() + srcArchive := filepath.Join(tmpdir, "empty-archive") + dstDb := filepath.Join(tmpdir, "dst-archive.db") + + // Create empty source archive + if err := os.MkdirAll(srcArchive, 0755); err != nil { + t.Fatalf("Failed to create source directory: %s", err.Error()) + } + + // Write version file + versionFile := filepath.Join(srcArchive, "version.txt") + if err := os.WriteFile(versionFile, []byte("3"), 0644); err != nil { + t.Fatalf("Failed to write version file: %s", err.Error()) + } + + // Initialize backends + srcConfig := fmt.Sprintf(`{"kind":"file","path":"%s"}`, srcArchive) + srcBackend, err := archive.InitBackend(json.RawMessage(srcConfig)) + if err != nil { + t.Fatalf("Failed to initialize source backend: %s", err.Error()) + } + + dstConfig := fmt.Sprintf(`{"kind":"sqlite","dbPath":"%s"}`, dstDb) + dstBackend, err := archive.InitBackend(json.RawMessage(dstConfig)) + if err != nil { + t.Fatalf("Failed to initialize destination backend: %s", err.Error()) + } + + // Perform import + imported, failed, err := importArchive(srcBackend, dstBackend) + if err != nil { + t.Errorf("Import from empty archive should not fail: %s", err.Error()) + } + + if imported != 0 { + t.Errorf("Expected 0 imported jobs, got %d", imported) + } + + if failed != 0 { + t.Errorf("Expected 0 failed jobs, got %d", failed) + } +} + +// TestImportDuplicateJobs tests that duplicate jobs are skipped +func TestImportDuplicateJobs(t *testing.T) { + tmpdir := t.TempDir() + srcArchive := filepath.Join(tmpdir, "src-archive") + dstDb := filepath.Join(tmpdir, "dst-archive.db") + + // Copy test data + testDataPath := "../../pkg/archive/testdata/archive" + if _, err := os.Stat(testDataPath); os.IsNotExist(err) { + t.Skip("Test data not found, skipping integration test") + } + + if err := util.CopyDir(testDataPath, srcArchive); err != nil { + t.Fatalf("Failed to copy test data: %s", err.Error()) + } + + // Initialize backends + srcConfig := fmt.Sprintf(`{"kind":"file","path":"%s"}`, srcArchive) + srcBackend, err := archive.InitBackend(json.RawMessage(srcConfig)) + if err != nil { + t.Fatalf("Failed to initialize source backend: %s", err.Error()) + } + + dstConfig := fmt.Sprintf(`{"kind":"sqlite","dbPath":"%s"}`, dstDb) + dstBackend, err := archive.InitBackend(json.RawMessage(dstConfig)) + if err != nil { + t.Fatalf("Failed to initialize destination backend: %s", err.Error()) + } + + // First import + imported1, _, err := importArchive(srcBackend, dstBackend) + if err != nil { + t.Fatalf("First import failed: %s", err.Error()) + } + + // Second import (should skip all jobs) + imported2, _, err := importArchive(srcBackend, dstBackend) + if err != nil { + t.Errorf("Second import failed: %s", err.Error()) + } + + if imported2 != 0 { + t.Errorf("Second import should skip all jobs, but imported %d", imported2) + } + + t.Logf("First import: %d jobs, Second import: %d jobs (all skipped as expected)", imported1, imported2) +} + +// TestJobStub is a helper test to verify that the job stub used in tests matches the schema +func TestJobStub(t *testing.T) { + job := &schema.Job{ + JobID: 123, + Cluster: "test-cluster", + StartTime: 1234567890, + } + + if job.JobID != 123 { + t.Errorf("Expected JobID 123, got %d", job.JobID) + } +} diff --git a/tools/archive-manager/main.go b/tools/archive-manager/main.go index 5073d5d..350bc81 100644 --- a/tools/archive-manager/main.go +++ b/tools/archive-manager/main.go @@ -9,6 +9,8 @@ import ( "flag" "fmt" "os" + "sync" + "sync/atomic" "time" "github.com/ClusterCockpit/cc-backend/internal/config" @@ -31,9 +33,106 @@ func parseDate(in string) int64 { return 0 } +// importArchive imports all jobs from a source archive backend to a destination archive backend. +// It uses parallel processing with a worker pool to improve performance. +// Returns the number of successfully imported jobs, failed jobs, and any error encountered. +func importArchive(srcBackend, dstBackend archive.ArchiveBackend) (int, int, error) { + cclog.Info("Starting parallel archive import...") + + // Use atomic counters for thread-safe updates + var imported int32 + var failed int32 + var skipped int32 + + // Number of parallel workers + numWorkers := 4 + cclog.Infof("Using %d parallel workers", numWorkers) + + // Create channels for job distribution + jobs := make(chan archive.JobContainer, numWorkers*2) + + // WaitGroup to track worker completion + var wg sync.WaitGroup + + // Start worker goroutines + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + + for job := range jobs { + // Validate job metadata + if job.Meta == nil { + cclog.Warn("Skipping job with nil metadata") + atomic.AddInt32(&failed, 1) + continue + } + + // Validate job data + if job.Data == nil { + cclog.Warnf("Job %d from cluster %s has no metric data, skipping", + job.Meta.JobID, job.Meta.Cluster) + atomic.AddInt32(&failed, 1) + continue + } + + // Check if job already exists in destination + if dstBackend.Exists(job.Meta) { + cclog.Debugf("Job %d (cluster: %s, start: %d) already exists in destination, skipping", + job.Meta.JobID, job.Meta.Cluster, job.Meta.StartTime) + atomic.AddInt32(&skipped, 1) + continue + } + + // Import job to destination + if err := dstBackend.ImportJob(job.Meta, job.Data); err != nil { + cclog.Errorf("Failed to import job %d from cluster %s: %s", + job.Meta.JobID, job.Meta.Cluster, err.Error()) + atomic.AddInt32(&failed, 1) + continue + } + + // Successfully imported + newCount := atomic.AddInt32(&imported, 1) + if newCount%100 == 0 { + cclog.Infof("Progress: %d jobs imported, %d skipped, %d failed", + newCount, atomic.LoadInt32(&skipped), atomic.LoadInt32(&failed)) + } + } + }(i) + } + + // Feed jobs to workers + go func() { + for job := range srcBackend.Iter(true) { + jobs <- job + } + close(jobs) + }() + + // Wait for all workers to complete + wg.Wait() + + finalImported := int(atomic.LoadInt32(&imported)) + finalFailed := int(atomic.LoadInt32(&failed)) + finalSkipped := int(atomic.LoadInt32(&skipped)) + + cclog.Infof("Import completed: %d jobs imported, %d skipped, %d failed", + finalImported, finalSkipped, finalFailed) + + if finalFailed > 0 { + return finalImported, finalFailed, fmt.Errorf("%d jobs failed to import", finalFailed) + } + + return finalImported, finalFailed, nil +} + + + func main() { var srcPath, flagConfigFile, flagLogLevel, flagRemoveCluster, flagRemoveAfter, flagRemoveBefore string - var flagLogDateTime, flagValidate bool + var flagSrcConfig, flagDstConfig string + var flagLogDateTime, flagValidate, flagImport bool flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive") flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages") @@ -43,14 +142,54 @@ func main() { flag.StringVar(&flagRemoveBefore, "remove-before", "", "Remove all jobs with start time before date (Format: 2006-Jan-04)") flag.StringVar(&flagRemoveAfter, "remove-after", "", "Remove all jobs with start time after date (Format: 2006-Jan-04)") flag.BoolVar(&flagValidate, "validate", false, "Set this flag to validate a job archive against the json schema") + flag.BoolVar(&flagImport, "import", false, "Import jobs from source archive to destination archive") + flag.StringVar(&flagSrcConfig, "src-config", "", "Source archive backend configuration (JSON), e.g. '{\"kind\":\"file\",\"path\":\"./archive\"}'") + flag.StringVar(&flagDstConfig, "dst-config", "", "Destination archive backend configuration (JSON), e.g. '{\"kind\":\"sqlite\",\"dbPath\":\"./archive.db\"}'") flag.Parse() + archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", srcPath) cclog.Init(flagLogLevel, flagLogDateTime) + // Handle import mode + if flagImport { + if flagSrcConfig == "" || flagDstConfig == "" { + cclog.Fatal("Both --src-config and --dst-config must be specified for import mode") + } + + cclog.Info("Import mode: initializing source and destination backends...") + + // Initialize source backend + srcBackend, err := archive.InitBackend(json.RawMessage(flagSrcConfig)) + if err != nil { + cclog.Fatalf("Failed to initialize source backend: %s", err.Error()) + } + cclog.Info("Source backend initialized successfully") + + // Initialize destination backend + dstBackend, err := archive.InitBackend(json.RawMessage(flagDstConfig)) + if err != nil { + cclog.Fatalf("Failed to initialize destination backend: %s", err.Error()) + } + cclog.Info("Destination backend initialized successfully") + + // Perform import + imported, failed, err := importArchive(srcBackend, dstBackend) + if err != nil { + cclog.Errorf("Import completed with errors: %s", err.Error()) + if failed > 0 { + os.Exit(1) + } + } + + cclog.Infof("Import finished successfully: %d jobs imported", imported) + os.Exit(0) + } + ccconf.Init(flagConfigFile) + // Load and check main configuration if cfg := ccconf.GetPackageConfig("main"); cfg != nil { if clustercfg := ccconf.GetPackageConfig("clusters"); clustercfg != nil { diff --git a/tools/archive-migration/README.md b/tools/archive-migration/README.md new file mode 100644 index 0000000..0a5b711 --- /dev/null +++ b/tools/archive-migration/README.md @@ -0,0 +1,133 @@ +# Archive Migration Tool + +## Overview + +The `archive-migration` tool migrates job archives from old schema versions to the current schema version. It handles schema changes such as the `exclusive` → `shared` field transformation and adds/removes fields as needed. + +## Features + +- **Parallel Processing**: Uses worker pool for fast migration +- **Dry-Run Mode**: Preview changes without modifying files +- **Safe Transformations**: Applies well-defined schema transformations +- **Progress Reporting**: Shows real-time migration progress +- **Error Handling**: Continues on individual failures, reports at end + +## Schema Transformations + +### Exclusive → Shared +Converts the old `exclusive` integer field to the new `shared` string field: +- `0` → `"multi_user"` +- `1` → `"none"` +- `2` → `"single_user"` + +### Missing Fields +Adds fields required by current schema: +- `submitTime`: Defaults to `startTime` if missing +- `energy`: Defaults to `0.0` +- `requestedMemory`: Defaults to `0` +- `shared`: Defaults to `"none"` if still missing after transformation + +### Deprecated Fields +Removes fields no longer in schema: +- `mem_used_max`, `flops_any_avg`, `mem_bw_avg` +- `load_avg`, `net_bw_avg`, `net_data_vol_total` +- `file_bw_avg`, `file_data_vol_total` + +## Usage + +### Build +```bash +cd /Users/jan/prg/cc-backend/tools/archive-migration +go build +``` + +### Dry Run (Preview Changes) +```bash +./archive-migration --archive /path/to/archive --dry-run +``` + +### Migrate Archive +```bash +# IMPORTANT: Backup your archive first! +cp -r /path/to/archive /path/to/archive-backup + +# Run migration +./archive-migration --archive /path/to/archive +``` + +### Command-Line Options + +- `--archive `: Path to job archive (required) +- `--dry-run`: Preview changes without modifying files +- `--workers `: Number of parallel workers (default: 4) +- `--loglevel `: Logging level: debug, info, warn, err, fatal, crit (default: info) +- `--logdate`: Add timestamps to log messages + +## Examples + +```bash +# Preview what would change +./archive-migration --archive ./var/job-archive --dry-run + +# Migrate with verbose logging +./archive-migration --archive ./var/job-archive --loglevel debug + +# Migrate with 8 workers for faster processing +./archive-migration --archive ./var/job-archive --workers 8 +``` + +## Safety + +> [!CAUTION] +> **Always backup your archive before running migration!** + +The tool modifies `meta.json` files in place. While transformations are designed to be safe, unexpected issues could occur. Follow these safety practices: + +1. **Always run with `--dry-run` first** to preview changes +2. **Backup your archive** before migration +3. **Test on a copy** of your archive first +4. **Verify results** after migration + +## Verification + +After migration, verify the archive: + +```bash +# Use archive-manager to check the archive +cd ../archive-manager +./archive-manager -s /path/to/migrated-archive + +# Or validate specific jobs +./archive-manager -s /path/to/migrated-archive --validate +``` + +## Troubleshooting + +### Migration Failures + +If individual jobs fail to migrate: +- Check the error messages for specific files +- Examine the failing `meta.json` files manually +- Fix invalid JSON or unexpected field types +- Re-run migration (already-migrated jobs will be processed again) + +### Performance + +For large archives: +- Increase `--workers` for more parallelism +- Use `--loglevel warn` to reduce log output +- Monitor disk I/O if migration is slow + +## Technical Details + +The migration process: +1. Walks archive directory recursively +2. Finds all `meta.json` files +3. Distributes jobs to worker pool +4. For each job: + - Reads JSON file + - Applies transformations in order + - Writes back migrated data (if not dry-run) +5. Reports statistics and errors + +Transformations are idempotent - running migration multiple times is safe (though not recommended for performance). diff --git a/tools/archive-migration/archive-migration b/tools/archive-migration/archive-migration new file mode 100755 index 0000000..cd73728 Binary files /dev/null and b/tools/archive-migration/archive-migration differ diff --git a/tools/archive-migration/main.go b/tools/archive-migration/main.go new file mode 100644 index 0000000..edd48f4 --- /dev/null +++ b/tools/archive-migration/main.go @@ -0,0 +1,67 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package main + +import ( + "flag" + "fmt" + "os" + + cclog "github.com/ClusterCockpit/cc-lib/ccLogger" +) + +func main() { + var archivePath string + var dryRun bool + var numWorkers int + var flagLogLevel string + var flagLogDateTime bool + + flag.StringVar(&archivePath, "archive", "", "Path to job archive to migrate (required)") + flag.BoolVar(&dryRun, "dry-run", false, "Preview changes without modifying files") + flag.IntVar(&numWorkers, "workers", 4, "Number of parallel workers") + flag.StringVar(&flagLogLevel, "loglevel", "info", "Sets the logging level: `[debug,info,warn (default),err,fatal,crit]`") + flag.BoolVar(&flagLogDateTime, "logdate", false, "Add date and time to log messages") + flag.Parse() + + // Initialize logger + cclog.Init(flagLogLevel, flagLogDateTime) + + // Validate inputs + if archivePath == "" { + fmt.Fprintf(os.Stderr, "Error: --archive flag is required\n\n") + flag.Usage() + os.Exit(1) + } + + // Check if archive path exists + if _, err := os.Stat(archivePath); os.IsNotExist(err) { + cclog.Fatalf("Archive path does not exist: %s", archivePath) + } + + // Display warning for non-dry-run mode + if !dryRun { + cclog.Warn("WARNING: This will modify files in the archive!") + cclog.Warn("It is strongly recommended to backup your archive first.") + cclog.Warn("Run with --dry-run first to preview changes.") + cclog.Info("") + } + + // Run migration + migrated, failed, err := migrateArchive(archivePath, dryRun, numWorkers) + + if err != nil { + cclog.Errorf("Migration completed with errors: %s", err.Error()) + if failed > 0 { + os.Exit(1) + } + } + + if dryRun { + cclog.Infof("Dry run completed: %d jobs would be migrated", migrated) + } else { + cclog.Infof("Migration completed successfully: %d jobs migrated", migrated) + } +} diff --git a/tools/archive-migration/transforms.go b/tools/archive-migration/transforms.go new file mode 100644 index 0000000..5d8798f --- /dev/null +++ b/tools/archive-migration/transforms.go @@ -0,0 +1,232 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package main + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "sync/atomic" + + cclog "github.com/ClusterCockpit/cc-lib/ccLogger" +) + +// transformExclusiveToShared converts the old 'exclusive' field to the new 'shared' field +// Mapping: 0 -> "multi_user", 1 -> "none", 2 -> "single_user" +func transformExclusiveToShared(jobData map[string]interface{}) error { + // Check if 'exclusive' field exists + if exclusive, ok := jobData["exclusive"]; ok { + var exclusiveVal int + + // Handle both int and float64 (JSON unmarshaling can produce float64) + switch v := exclusive.(type) { + case float64: + exclusiveVal = int(v) + case int: + exclusiveVal = v + default: + return fmt.Errorf("exclusive field has unexpected type: %T", exclusive) + } + + // Map exclusive to shared + var shared string + switch exclusiveVal { + case 0: + shared = "multi_user" + case 1: + shared = "none" + case 2: + shared = "single_user" + default: + return fmt.Errorf("invalid exclusive value: %d", exclusiveVal) + } + + // Add shared field and remove exclusive + jobData["shared"] = shared + delete(jobData, "exclusive") + + cclog.Debugf("Transformed exclusive=%d to shared=%s", exclusiveVal, shared) + } + + return nil +} + +// addMissingFields adds fields that are required in the current schema but might be missing in old archives +func addMissingFields(jobData map[string]interface{}) error { + // Add submitTime if missing (default to startTime) + if _, ok := jobData["submitTime"]; !ok { + if startTime, ok := jobData["startTime"]; ok { + jobData["submitTime"] = startTime + cclog.Debug("Added submitTime (defaulted to startTime)") + } + } + + // Add energy if missing (default to 0.0) + if _, ok := jobData["energy"]; !ok { + jobData["energy"] = 0.0 + } + + // Add requestedMemory if missing (default to 0) + if _, ok := jobData["requestedMemory"]; !ok { + jobData["requestedMemory"] = 0 + } + + // Ensure shared field exists (if still missing, default to "none") + if _, ok := jobData["shared"]; !ok { + jobData["shared"] = "none" + cclog.Debug("Added default shared field: none") + } + + return nil +} + +// removeDeprecatedFields removes fields that are no longer in the current schema +func removeDeprecatedFields(jobData map[string]interface{}) error { + // List of deprecated fields to remove + deprecatedFields := []string{ + "mem_used_max", + "flops_any_avg", + "mem_bw_avg", + "load_avg", + "net_bw_avg", + "net_data_vol_total", + "file_bw_avg", + "file_data_vol_total", + } + + for _, field := range deprecatedFields { + if _, ok := jobData[field]; ok { + delete(jobData, field) + cclog.Debugf("Removed deprecated field: %s", field) + } + } + + return nil +} + +// migrateJobMetadata applies all transformations to a job metadata map +func migrateJobMetadata(jobData map[string]interface{}) error { + // Apply transformations in order + if err := transformExclusiveToShared(jobData); err != nil { + return fmt.Errorf("transformExclusiveToShared failed: %w", err) + } + + if err := addMissingFields(jobData); err != nil { + return fmt.Errorf("addMissingFields failed: %w", err) + } + + if err := removeDeprecatedFields(jobData); err != nil { + return fmt.Errorf("removeDeprecatedFields failed: %w", err) + } + + return nil +} + +// processJob reads, migrates, and writes a job metadata file +func processJob(metaPath string, dryRun bool) error { + // Read the meta.json file + data, err := os.ReadFile(metaPath) + if err != nil { + return fmt.Errorf("failed to read %s: %w", metaPath, err) + } + + // Parse JSON + var jobData map[string]interface{} + if err := json.Unmarshal(data, &jobData); err != nil { + return fmt.Errorf("failed to parse JSON from %s: %w", metaPath, err) + } + + // Apply migrations + if err := migrateJobMetadata(jobData); err != nil { + return fmt.Errorf("migration failed for %s: %w", metaPath, err) + } + + // If dry-run, just report what would change + if dryRun { + cclog.Infof("Would migrate: %s", metaPath) + return nil + } + + // Write back the migrated data + migratedData, err := json.MarshalIndent(jobData, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal migrated data: %w", err) + } + + if err := os.WriteFile(metaPath, migratedData, 0644); err != nil { + return fmt.Errorf("failed to write %s: %w", metaPath, err) + } + + return nil +} + +// migrateArchive walks through an archive directory and migrates all meta.json files +func migrateArchive(archivePath string, dryRun bool, numWorkers int) (int, int, error) { + cclog.Infof("Starting archive migration at %s", archivePath) + if dryRun { + cclog.Info("DRY RUN MODE - no files will be modified") + } + + var migrated int32 + var failed int32 + + // Channel for job paths + jobs :=make(chan string, numWorkers*2) + var wg sync.WaitGroup + + // Start worker goroutines + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + + for metaPath := range jobs { + if err := processJob(metaPath, dryRun); err != nil { + cclog.Errorf("Failed to migrate %s: %s", metaPath, err.Error()) + atomic.AddInt32(&failed, 1) + continue + } + + newCount := atomic.AddInt32(&migrated, 1) + if newCount%100 == 0 { + cclog.Infof("Progress: %d jobs migrated, %d failed", newCount, atomic.LoadInt32(&failed)) + } + } + }(i) + } + + // Walk the archive directory and find all meta.json files + go func() { + filepath.Walk(archivePath, func(path string, info os.FileInfo, err error) error { + if err != nil { + cclog.Errorf("Error accessing path %s: %s", path, err.Error()) + return nil // Continue walking + } + + if !info.IsDir() && info.Name() == "meta.json" { + jobs <- path + } + + return nil + }) + close(jobs) + }() + + // Wait for all workers to complete + wg.Wait() + + finalMigrated := int(atomic.LoadInt32(&migrated)) + finalFailed := int(atomic.LoadInt32(&failed)) + + cclog.Infof("Migration completed: %d jobs migrated, %d failed", finalMigrated, finalFailed) + + if finalFailed > 0 { + return finalMigrated, finalFailed, fmt.Errorf("%d jobs failed to migrate", finalFailed) + } + + return finalMigrated, finalFailed, nil +}