archive-migration: Add check for archive version and rewrite version after migration

This commit is contained in:
2025-12-04 15:08:03 +01:00
parent 7cff8bbfd2
commit 7da01975f7
2 changed files with 52 additions and 11 deletions

View File

@@ -5,9 +5,12 @@
package main package main
import ( import (
"bufio"
"flag" "flag"
"fmt" "fmt"
"os" "os"
"path/filepath"
"strings"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
) )
@@ -41,17 +44,33 @@ func main() {
cclog.Fatalf("Archive path does not exist: %s", archivePath) cclog.Fatalf("Archive path does not exist: %s", archivePath)
} }
// Check archive version
if err := checkVersion(archivePath); err != nil {
cclog.Fatalf("Version check failed: %v", err)
}
// Display warning for non-dry-run mode // Display warning for non-dry-run mode
if !dryRun { if !dryRun {
cclog.Warn("WARNING: This will modify files in the archive!") cclog.Warn("WARNING: This will modify files in the archive!")
cclog.Warn("It is strongly recommended to backup your archive first.") cclog.Warn("It is strongly recommended to backup your archive first.")
cclog.Warn("Run with --dry-run first to preview changes.") cclog.Warn("Run with --dry-run first to preview changes.")
cclog.Info("") cclog.Info("")
fmt.Print("Are you sure you want to continue? [y/N]: ")
reader := bufio.NewReader(os.Stdin)
input, err := reader.ReadString('\n')
if err != nil {
cclog.Fatalf("Error reading input: %v", err)
}
if strings.ToLower(strings.TrimSpace(input)) != "y" {
cclog.Info("Aborted by user.")
os.Exit(0)
}
} }
// Run migration // Run migration
migrated, failed, err := migrateArchive(archivePath, dryRun, numWorkers) migrated, failed, err := migrateArchive(archivePath, dryRun, numWorkers)
if err != nil { if err != nil {
cclog.Errorf("Migration completed with errors: %s", err.Error()) cclog.Errorf("Migration completed with errors: %s", err.Error())
if failed > 0 { if failed > 0 {
@@ -62,6 +81,28 @@ func main() {
if dryRun { if dryRun {
cclog.Infof("Dry run completed: %d jobs would be migrated", migrated) cclog.Infof("Dry run completed: %d jobs would be migrated", migrated)
} else { } else {
if err := updateVersion(archivePath); err != nil {
cclog.Errorf("Failed to update archive version: %v", err)
os.Exit(1)
}
cclog.Infof("Migration completed successfully: %d jobs migrated", migrated) cclog.Infof("Migration completed successfully: %d jobs migrated", migrated)
} }
} }
func checkVersion(archivePath string) error {
versionFile := filepath.Join(archivePath, "version.txt")
data, err := os.ReadFile(versionFile)
if err != nil {
return fmt.Errorf("failed to read version.txt: %v", err)
}
versionStr := strings.TrimSpace(string(data))
if versionStr != "2" {
return fmt.Errorf("archive version is %s, expected 2", versionStr)
}
return nil
}
func updateVersion(archivePath string) error {
versionFile := filepath.Join(archivePath, "version.txt")
return os.WriteFile(versionFile, []byte("3\n"), 0644)
}

View File

@@ -17,11 +17,11 @@ import (
// transformExclusiveToShared converts the old 'exclusive' field to the new 'shared' field // transformExclusiveToShared converts the old 'exclusive' field to the new 'shared' field
// Mapping: 0 -> "multi_user", 1 -> "none", 2 -> "single_user" // Mapping: 0 -> "multi_user", 1 -> "none", 2 -> "single_user"
func transformExclusiveToShared(jobData map[string]interface{}) error { func transformExclusiveToShared(jobData map[string]any) error {
// Check if 'exclusive' field exists // Check if 'exclusive' field exists
if exclusive, ok := jobData["exclusive"]; ok { if exclusive, ok := jobData["exclusive"]; ok {
var exclusiveVal int var exclusiveVal int
// Handle both int and float64 (JSON unmarshaling can produce float64) // Handle both int and float64 (JSON unmarshaling can produce float64)
switch v := exclusive.(type) { switch v := exclusive.(type) {
case float64: case float64:
@@ -48,7 +48,7 @@ func transformExclusiveToShared(jobData map[string]interface{}) error {
// Add shared field and remove exclusive // Add shared field and remove exclusive
jobData["shared"] = shared jobData["shared"] = shared
delete(jobData, "exclusive") delete(jobData, "exclusive")
cclog.Debugf("Transformed exclusive=%d to shared=%s", exclusiveVal, shared) cclog.Debugf("Transformed exclusive=%d to shared=%s", exclusiveVal, shared)
} }
@@ -56,7 +56,7 @@ func transformExclusiveToShared(jobData map[string]interface{}) error {
} }
// addMissingFields adds fields that are required in the current schema but might be missing in old archives // addMissingFields adds fields that are required in the current schema but might be missing in old archives
func addMissingFields(jobData map[string]interface{}) error { func addMissingFields(jobData map[string]any) error {
// Add submitTime if missing (default to startTime) // Add submitTime if missing (default to startTime)
if _, ok := jobData["submitTime"]; !ok { if _, ok := jobData["submitTime"]; !ok {
if startTime, ok := jobData["startTime"]; ok { if startTime, ok := jobData["startTime"]; ok {
@@ -85,7 +85,7 @@ func addMissingFields(jobData map[string]interface{}) error {
} }
// removeDeprecatedFields removes fields that are no longer in the current schema // removeDeprecatedFields removes fields that are no longer in the current schema
func removeDeprecatedFields(jobData map[string]interface{}) error { func removeDeprecatedFields(jobData map[string]any) error {
// List of deprecated fields to remove // List of deprecated fields to remove
deprecatedFields := []string{ deprecatedFields := []string{
"mem_used_max", "mem_used_max",
@@ -109,7 +109,7 @@ func removeDeprecatedFields(jobData map[string]interface{}) error {
} }
// migrateJobMetadata applies all transformations to a job metadata map // migrateJobMetadata applies all transformations to a job metadata map
func migrateJobMetadata(jobData map[string]interface{}) error { func migrateJobMetadata(jobData map[string]any) error {
// Apply transformations in order // Apply transformations in order
if err := transformExclusiveToShared(jobData); err != nil { if err := transformExclusiveToShared(jobData); err != nil {
return fmt.Errorf("transformExclusiveToShared failed: %w", err) return fmt.Errorf("transformExclusiveToShared failed: %w", err)
@@ -135,7 +135,7 @@ func processJob(metaPath string, dryRun bool) error {
} }
// Parse JSON // Parse JSON
var jobData map[string]interface{} var jobData map[string]any
if err := json.Unmarshal(data, &jobData); err != nil { if err := json.Unmarshal(data, &jobData); err != nil {
return fmt.Errorf("failed to parse JSON from %s: %w", metaPath, err) return fmt.Errorf("failed to parse JSON from %s: %w", metaPath, err)
} }
@@ -157,7 +157,7 @@ func processJob(metaPath string, dryRun bool) error {
return fmt.Errorf("failed to marshal migrated data: %w", err) return fmt.Errorf("failed to marshal migrated data: %w", err)
} }
if err := os.WriteFile(metaPath, migratedData, 0644); err != nil { if err := os.WriteFile(metaPath, migratedData, 0o644); err != nil {
return fmt.Errorf("failed to write %s: %w", metaPath, err) return fmt.Errorf("failed to write %s: %w", metaPath, err)
} }
@@ -175,11 +175,11 @@ func migrateArchive(archivePath string, dryRun bool, numWorkers int) (int, int,
var failed int32 var failed int32
// Channel for job paths // Channel for job paths
jobs :=make(chan string, numWorkers*2) jobs := make(chan string, numWorkers*2)
var wg sync.WaitGroup var wg sync.WaitGroup
// Start worker goroutines // Start worker goroutines
for i := 0; i < numWorkers; i++ { for i := range numWorkers {
wg.Add(1) wg.Add(1)
go func(workerID int) { go func(workerID int) {
defer wg.Done() defer wg.Done()