From 7cff8bbfd22422ab9d0cea85d2766a8961d18565 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 4 Dec 2025 15:07:09 +0100 Subject: [PATCH 1/3] Add documentation for importer --- internal/importer/README.md | 132 +++++++++++++++++ internal/importer/handleImport.go | 89 +++--------- internal/importer/importer_test.go | 20 +++ internal/importer/initDB.go | 218 ++++++++++++++++++---------- internal/importer/normalize.go | 47 ++++++ internal/importer/normalize_test.go | 4 + 6 files changed, 362 insertions(+), 148 deletions(-) create mode 100644 internal/importer/README.md diff --git a/internal/importer/README.md b/internal/importer/README.md new file mode 100644 index 0000000..75a8016 --- /dev/null +++ b/internal/importer/README.md @@ -0,0 +1,132 @@ +# Importer Package + +The `importer` package provides functionality for importing job data into the ClusterCockpit database from archived job files. + +## Overview + +This package supports two primary import workflows: + +1. **Bulk Database Initialization** - Reinitialize the entire job database from archived jobs +2. **Individual Job Import** - Import specific jobs from metadata/data file pairs + +Both workflows enrich job metadata by calculating performance footprints and energy consumption metrics before persisting to the database. + +## Main Entry Points + +### InitDB() + +Reinitializes the job database from all archived jobs. + +```go +if err := importer.InitDB(); err != nil { + log.Fatal(err) +} +``` + +This function: +- Flushes existing job, tag, and jobtag tables +- Iterates through all jobs in the configured archive +- Enriches each job with calculated metrics +- Inserts jobs into the database in batched transactions (100 jobs per batch) +- Continues on individual job failures, logging errors + +**Use Case**: Initial database setup or complete database rebuild from archive. + +### HandleImportFlag(flag string) + +Imports jobs from specified file pairs. + +```go +// Format: ":[,:,...]" +flag := "/path/to/meta.json:/path/to/data.json" +if err := importer.HandleImportFlag(flag); err != nil { + log.Fatal(err) +} +``` + +This function: +- Parses the comma-separated file pairs +- Validates metadata and job data against schemas (if validation enabled) +- Enriches each job with footprints and energy metrics +- Imports jobs into both the archive and database +- Fails fast on the first error + +**Use Case**: Importing specific jobs from external sources or manual job additions. + +## Job Enrichment + +Both import workflows use `enrichJobMetadata()` to calculate: + +### Performance Footprints + +Performance footprints are calculated from metric averages based on the subcluster configuration: + +```go +job.Footprint["mem_used_avg"] = 45.2 // GB +job.Footprint["cpu_load_avg"] = 0.87 // percentage +``` + +### Energy Metrics + +Energy consumption is calculated from power metrics using the formula: + +``` +Energy (kWh) = (Power (W) × Duration (s) / 3600) / 1000 +``` + +For each energy metric: +```go +job.EnergyFootprint["acc_power"] = 12.5 // kWh +job.Energy = 150.2 // Total energy in kWh +``` + +**Note**: Energy calculations for metrics with unit "energy" (Joules) are not yet implemented. + +## Data Validation + +### SanityChecks(job *schema.Job) + +Validates job metadata before database insertion: + +- Cluster exists in configuration +- Subcluster is valid (assigns if needed) +- Job state is valid +- Resources and user fields are populated +- Node counts and hardware thread counts are positive +- Resource count matches declared node count + +## Normalization Utilities + +The package includes utilities for normalizing metric values to appropriate SI prefixes: + +### Normalize(avg float64, prefix string) + +Adjusts values and SI prefixes for readability: + +```go +factor, newPrefix := importer.Normalize(2048.0, "M") +// Converts 2048 MB → ~2.0 GB +// Returns: factor for conversion, "G" +``` + +This is useful for automatically scaling metrics (e.g., memory, storage) to human-readable units. + +## Dependencies + +- `github.com/ClusterCockpit/cc-backend/internal/repository` - Database operations +- `github.com/ClusterCockpit/cc-backend/pkg/archive` - Job archive access +- `github.com/ClusterCockpit/cc-lib/schema` - Job schema definitions +- `github.com/ClusterCockpit/cc-lib/ccLogger` - Logging +- `github.com/ClusterCockpit/cc-lib/ccUnits` - SI unit handling + +## Error Handling + +- **InitDB**: Continues processing on individual job failures, logs errors, returns summary +- **HandleImportFlag**: Fails fast on first error, returns immediately +- Both functions log detailed error context for debugging + +## Performance + +- **Transaction Batching**: InitDB processes jobs in batches of 100 for optimal database performance +- **Tag Caching**: Tag IDs are cached during import to minimize database queries +- **Progress Reporting**: InitDB prints progress updates during bulk operations diff --git a/internal/importer/handleImport.go b/internal/importer/handleImport.go index f527781..482b328 100644 --- a/internal/importer/handleImport.go +++ b/internal/importer/handleImport.go @@ -8,7 +8,6 @@ import ( "bytes" "encoding/json" "fmt" - "math" "os" "strings" @@ -19,7 +18,22 @@ import ( "github.com/ClusterCockpit/cc-lib/schema" ) -// Import all jobs specified as `:,...` +// HandleImportFlag imports jobs from file pairs specified in a comma-separated flag string. +// +// The flag format is: ":[,:,...]" +// +// For each job pair, this function: +// 1. Reads and validates the metadata JSON file (schema.Job) +// 2. Reads and validates the job data JSON file (schema.JobData) +// 3. Enriches the job with calculated footprints and energy metrics +// 4. Validates the job using SanityChecks() +// 5. Imports the job into the archive +// 6. Inserts the job into the database with associated tags +// +// Schema validation is performed if config.Keys.Validate is true. +// +// Returns an error if file reading, validation, enrichment, or database operations fail. +// The function stops processing on the first error encountered. func HandleImportFlag(flag string) error { r := repository.GetJobRepository() @@ -72,75 +86,8 @@ func HandleImportFlag(flag string) error { job.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful - sc, err := archive.GetSubCluster(job.Cluster, job.SubCluster) - if err != nil { - cclog.Errorf("cannot get subcluster: %s", err.Error()) - return err - } - - job.Footprint = make(map[string]float64) - - for _, fp := range sc.Footprint { - statType := "avg" - - if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil { - statType = sc.MetricConfig[i].Footprint - } - - name := fmt.Sprintf("%s_%s", fp, statType) - - job.Footprint[name] = repository.LoadJobStat(&job, fp, statType) - } - - job.RawFootprint, err = json.Marshal(job.Footprint) - if err != nil { - cclog.Warn("Error while marshaling job footprint") - return err - } - - job.EnergyFootprint = make(map[string]float64) - - // Total Job Energy Outside Loop - totalEnergy := 0.0 - for _, fp := range sc.EnergyFootprint { - // Always Init Metric Energy Inside Loop - metricEnergy := 0.0 - if i, err := archive.MetricIndex(sc.MetricConfig, fp); err == nil { - // Note: For DB data, calculate and save as kWh - if sc.MetricConfig[i].Energy == "energy" { // this metric has energy as unit (Joules) - cclog.Warnf("Update EnergyFootprint for Job %d and Metric %s on cluster %s: Set to 'energy' in cluster.json: Not implemented, will return 0.0", job.JobID, job.Cluster, fp) - // FIXME: Needs sum as stats type - } else if sc.MetricConfig[i].Energy == "power" { // this metric has power as unit (Watt) - // Energy: Power (in Watts) * Time (in Seconds) - // Unit: (W * (s / 3600)) / 1000 = kWh - // Round 2 Digits: round(Energy * 100) / 100 - // Here: (All-Node Metric Average * Number of Nodes) * (Job Duration in Seconds / 3600) / 1000 - // Note: Shared Jobs handled correctly since "Node Average" is based on partial resources, while "numNodes" factor is 1 - rawEnergy := ((repository.LoadJobStat(&job, fp, "avg") * float64(job.NumNodes)) * (float64(job.Duration) / 3600.0)) / 1000.0 - metricEnergy = math.Round(rawEnergy*100.0) / 100.0 - } - } else { - cclog.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, job.ID) - } - - job.EnergyFootprint[fp] = metricEnergy - totalEnergy += metricEnergy - } - - job.Energy = (math.Round(totalEnergy*100.0) / 100.0) - if job.RawEnergyFootprint, err = json.Marshal(job.EnergyFootprint); err != nil { - cclog.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", job.ID) - return err - } - - job.RawResources, err = json.Marshal(job.Resources) - if err != nil { - cclog.Warn("Error while marshaling job resources") - return err - } - job.RawMetaData, err = json.Marshal(job.MetaData) - if err != nil { - cclog.Warn("Error while marshaling job metadata") + if err = enrichJobMetadata(&job); err != nil { + cclog.Errorf("Error enriching job metadata: %v", err) return err } diff --git a/internal/importer/importer_test.go b/internal/importer/importer_test.go index d6bfd94..2aa007d 100644 --- a/internal/importer/importer_test.go +++ b/internal/importer/importer_test.go @@ -20,6 +20,8 @@ import ( cclog "github.com/ClusterCockpit/cc-lib/ccLogger" ) +// copyFile copies a file from source path to destination path. +// Used by tests to set up test fixtures. func copyFile(s string, d string) error { r, err := os.Open(s) if err != nil { @@ -35,6 +37,14 @@ func copyFile(s string, d string) error { return nil } +// setup initializes a test environment for importer tests. +// +// Creates a temporary directory with: +// - A test job archive with cluster configuration +// - A SQLite database initialized with schema +// - Configuration files loaded +// +// Returns a JobRepository instance for test assertions. func setup(t *testing.T) *repository.JobRepository { const testconfig = `{ "main": { @@ -130,6 +140,7 @@ func setup(t *testing.T) *repository.JobRepository { return repository.GetJobRepository() } +// Result represents the expected test result for job import verification. type Result struct { JobId int64 Cluster string @@ -137,6 +148,8 @@ type Result struct { Duration int32 } +// readResult reads the expected test result from a golden file. +// Golden files contain the expected job attributes after import. func readResult(t *testing.T, testname string) Result { var r Result @@ -154,6 +167,13 @@ func readResult(t *testing.T, testname string) Result { return r } +// TestHandleImportFlag tests the HandleImportFlag function with various job import scenarios. +// +// The test uses golden files in testdata/ to verify that jobs are correctly: +// - Parsed from metadata and data JSON files +// - Enriched with footprints and energy metrics +// - Inserted into the database +// - Retrievable with correct attributes func TestHandleImportFlag(t *testing.T) { r := setup(t) diff --git a/internal/importer/initDB.go b/internal/importer/initDB.go index b81a464..e061734 100644 --- a/internal/importer/initDB.go +++ b/internal/importer/initDB.go @@ -2,6 +2,15 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + +// Package importer provides functionality for importing job data into the ClusterCockpit database. +// +// The package supports two primary use cases: +// 1. Bulk database initialization from archived jobs via InitDB() +// 2. Individual job import from file pairs via HandleImportFlag() +// +// Both operations enrich job metadata by calculating footprints and energy metrics +// before persisting to the database. package importer import ( @@ -22,8 +31,21 @@ const ( setTagQuery = "INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)" ) -// Delete the tables "job", "tag" and "jobtag" from the database and -// repopulate them using the jobs found in `archive`. +// InitDB reinitializes the job database from archived job data. +// +// This function performs the following operations: +// 1. Flushes existing job, tag, and jobtag tables +// 2. Iterates through all jobs in the archive +// 3. Enriches each job with calculated footprints and energy metrics +// 4. Inserts jobs and tags into the database in batched transactions +// +// Jobs are processed in batches of 100 for optimal performance. The function +// continues processing even if individual jobs fail, logging errors and +// returning a summary at the end. +// +// Returns an error if database initialization, transaction management, or +// critical operations fail. Individual job failures are logged but do not +// stop the overall import process. func InitDB() error { r := repository.GetJobRepository() if err := r.Flush(); err != nil { @@ -72,76 +94,7 @@ func InitDB() error { jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful - sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) - if err != nil { - cclog.Errorf("cannot get subcluster: %s", err.Error()) - return err - } - - jobMeta.Footprint = make(map[string]float64) - - for _, fp := range sc.Footprint { - statType := "avg" - - if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil { - statType = sc.MetricConfig[i].Footprint - } - - name := fmt.Sprintf("%s_%s", fp, statType) - - jobMeta.Footprint[name] = repository.LoadJobStat(jobMeta, fp, statType) - } - - jobMeta.RawFootprint, err = json.Marshal(jobMeta.Footprint) - if err != nil { - cclog.Warn("Error while marshaling job footprint") - return err - } - - jobMeta.EnergyFootprint = make(map[string]float64) - - // Total Job Energy Outside Loop - totalEnergy := 0.0 - for _, fp := range sc.EnergyFootprint { - // Always Init Metric Energy Inside Loop - metricEnergy := 0.0 - if i, err := archive.MetricIndex(sc.MetricConfig, fp); err == nil { - // Note: For DB data, calculate and save as kWh - if sc.MetricConfig[i].Energy == "energy" { // this metric has energy as unit (Joules) - cclog.Warnf("Update EnergyFootprint for Job %d and Metric %s on cluster %s: Set to 'energy' in cluster.json: Not implemented, will return 0.0", jobMeta.JobID, jobMeta.Cluster, fp) - // FIXME: Needs sum as stats type - } else if sc.MetricConfig[i].Energy == "power" { // this metric has power as unit (Watt) - // Energy: Power (in Watts) * Time (in Seconds) - // Unit: (W * (s / 3600)) / 1000 = kWh - // Round 2 Digits: round(Energy * 100) / 100 - // Here: (All-Node Metric Average * Number of Nodes) * (Job Duration in Seconds / 3600) / 1000 - // Note: Shared Jobs handled correctly since "Node Average" is based on partial resources, while "numNodes" factor is 1 - rawEnergy := ((repository.LoadJobStat(jobMeta, fp, "avg") * float64(jobMeta.NumNodes)) * (float64(jobMeta.Duration) / 3600.0)) / 1000.0 - metricEnergy = math.Round(rawEnergy*100.0) / 100.0 - } - } else { - cclog.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, jobMeta.ID) - } - - jobMeta.EnergyFootprint[fp] = metricEnergy - totalEnergy += metricEnergy - } - - jobMeta.Energy = (math.Round(totalEnergy*100.0) / 100.0) - if jobMeta.RawEnergyFootprint, err = json.Marshal(jobMeta.EnergyFootprint); err != nil { - cclog.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", jobMeta.ID) - return err - } - - jobMeta.RawResources, err = json.Marshal(jobMeta.Resources) - if err != nil { - cclog.Errorf("repository initDB(): %v", err) - errorOccured++ - continue - } - - jobMeta.RawMetaData, err = json.Marshal(jobMeta.MetaData) - if err != nil { + if err := enrichJobMetadata(jobMeta); err != nil { cclog.Errorf("repository initDB(): %v", err) errorOccured++ continue @@ -163,9 +116,9 @@ func InitDB() error { for _, tag := range jobMeta.Tags { tagstr := tag.Name + ":" + tag.Type - tagId, ok := tags[tagstr] + tagID, ok := tags[tagstr] if !ok { - tagId, err = r.TransactionAdd(t, + tagID, err = r.TransactionAdd(t, addTagQuery, tag.Name, tag.Type) if err != nil { @@ -173,12 +126,12 @@ func InitDB() error { errorOccured++ continue } - tags[tagstr] = tagId + tags[tagstr] = tagID } r.TransactionAdd(t, setTagQuery, - id, tagId) + id, tagID) } if err == nil { @@ -195,7 +148,110 @@ func InitDB() error { return nil } -// This function also sets the subcluster if necessary! +// enrichJobMetadata calculates and populates job footprints, energy metrics, and serialized fields. +// +// This function performs the following enrichment operations: +// 1. Calculates job footprint metrics based on the subcluster configuration +// 2. Computes energy footprint and total energy consumption in kWh +// 3. Marshals footprints, resources, and metadata into JSON for database storage +// +// The function expects the job's MonitoringStatus and SubCluster to be already set. +// Energy calculations convert power metrics (Watts) to energy (kWh) using the formula: +// +// Energy (kWh) = (Power (W) * Duration (s) / 3600) / 1000 +// +// Returns an error if subcluster retrieval, metric indexing, or JSON marshaling fails. +func enrichJobMetadata(job *schema.Job) error { + sc, err := archive.GetSubCluster(job.Cluster, job.SubCluster) + if err != nil { + cclog.Errorf("cannot get subcluster: %s", err.Error()) + return err + } + + job.Footprint = make(map[string]float64) + + for _, fp := range sc.Footprint { + statType := "avg" + + if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil { + statType = sc.MetricConfig[i].Footprint + } + + name := fmt.Sprintf("%s_%s", fp, statType) + + job.Footprint[name] = repository.LoadJobStat(job, fp, statType) + } + + job.RawFootprint, err = json.Marshal(job.Footprint) + if err != nil { + cclog.Warn("Error while marshaling job footprint") + return err + } + + job.EnergyFootprint = make(map[string]float64) + + // Total Job Energy Outside Loop + totalEnergy := 0.0 + for _, fp := range sc.EnergyFootprint { + // Always Init Metric Energy Inside Loop + metricEnergy := 0.0 + if i, err := archive.MetricIndex(sc.MetricConfig, fp); err == nil { + // Note: For DB data, calculate and save as kWh + switch sc.MetricConfig[i].Energy { + case "energy": // this metric has energy as unit (Joules) + cclog.Warnf("Update EnergyFootprint for Job %d and Metric %s on cluster %s: Set to 'energy' in cluster.json: Not implemented, will return 0.0", job.JobID, job.Cluster, fp) + // FIXME: Needs sum as stats type + case "power": // this metric has power as unit (Watt) + // Energy: Power (in Watts) * Time (in Seconds) + // Unit: (W * (s / 3600)) / 1000 = kWh + // Round 2 Digits: round(Energy * 100) / 100 + // Here: (All-Node Metric Average * Number of Nodes) * (Job Duration in Seconds / 3600) / 1000 + // Note: Shared Jobs handled correctly since "Node Average" is based on partial resources, while "numNodes" factor is 1 + rawEnergy := ((repository.LoadJobStat(job, fp, "avg") * float64(job.NumNodes)) * (float64(job.Duration) / 3600.0)) / 1000.0 + metricEnergy = math.Round(rawEnergy*100.0) / 100.0 + } + } else { + cclog.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, job.ID) + } + + job.EnergyFootprint[fp] = metricEnergy + totalEnergy += metricEnergy + } + + job.Energy = (math.Round(totalEnergy*100.0) / 100.0) + if job.RawEnergyFootprint, err = json.Marshal(job.EnergyFootprint); err != nil { + cclog.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", job.ID) + return err + } + + job.RawResources, err = json.Marshal(job.Resources) + if err != nil { + cclog.Warn("Error while marshaling job resources") + return err + } + + job.RawMetaData, err = json.Marshal(job.MetaData) + if err != nil { + cclog.Warn("Error while marshaling job metadata") + return err + } + + return nil +} + +// SanityChecks validates job metadata and ensures cluster/subcluster configuration is valid. +// +// This function performs the following validations: +// 1. Verifies the cluster exists in the archive configuration +// 2. Assigns and validates the subcluster (may modify job.SubCluster) +// 3. Validates job state is a recognized value +// 4. Ensures resources and user fields are populated +// 5. Validates node counts and hardware thread counts are positive +// 6. Verifies the number of resources matches the declared node count +// +// The function may modify the job's SubCluster field if it needs to be assigned. +// +// Returns an error if any validation check fails. func SanityChecks(job *schema.Job) error { if c := archive.GetCluster(job.Cluster); c == nil { return fmt.Errorf("no such cluster: %v", job.Cluster) @@ -220,6 +276,14 @@ func SanityChecks(job *schema.Job) error { return nil } +// checkJobData normalizes metric units in job data based on average values. +// +// NOTE: This function is currently unused and contains incomplete implementation. +// It was intended to normalize byte and file-related metrics to appropriate SI prefixes, +// but the normalization logic is commented out. Consider removing or completing this +// function based on project requirements. +// +// TODO: Either implement the metric normalization or remove this dead code. func checkJobData(d *schema.JobData) error { for _, scopes := range *d { // var newUnit schema.Unit diff --git a/internal/importer/normalize.go b/internal/importer/normalize.go index bc72cb3..fc5e537 100644 --- a/internal/importer/normalize.go +++ b/internal/importer/normalize.go @@ -10,6 +10,15 @@ import ( ccunits "github.com/ClusterCockpit/cc-lib/ccUnits" ) +// getNormalizationFactor calculates the scaling factor needed to normalize a value +// to a more readable range (typically between 1.0 and 1000.0). +// +// For values greater than 1000, the function scales down by factors of 1000 (returns negative exponent). +// For values less than 1.0, the function scales up by factors of 1000 (returns positive exponent). +// +// Returns: +// - factor: The multiplicative factor to apply (10^(count*scale)) +// - exponent: The power of 10 representing the adjustment (multiple of 3 for SI prefixes) func getNormalizationFactor(v float64) (float64, int) { count := 0 scale := -3 @@ -29,6 +38,14 @@ func getNormalizationFactor(v float64) (float64, int) { return math.Pow10(count * scale), count * scale } +// getExponent calculates the SI prefix exponent from a numeric prefix value. +// +// For example: +// - Input: 1000.0 (kilo) returns 3 +// - Input: 1000000.0 (mega) returns 6 +// - Input: 1000000000.0 (giga) returns 9 +// +// Returns the exponent representing the power of 10 for the SI prefix. func getExponent(p float64) int { count := 0 @@ -40,12 +57,42 @@ func getExponent(p float64) int { return count * 3 } +// newPrefixFromFactor computes a new SI unit prefix after applying a normalization factor. +// +// Given an original prefix and an exponent adjustment, this function calculates +// the resulting SI prefix. For example, if normalizing from bytes (no prefix) by +// a factor of 10^9, the result would be the "G" (giga) prefix. +// +// Parameters: +// - op: The original SI prefix value +// - e: The exponent adjustment to apply +// +// Returns the new SI prefix after adjustment. func newPrefixFromFactor(op ccunits.Prefix, e int) ccunits.Prefix { f := float64(op) exp := math.Pow10(getExponent(f) - e) return ccunits.Prefix(exp) } +// Normalize adjusts a metric value and its SI unit prefix to a more readable range. +// +// This function is useful for automatically scaling metrics to appropriate units. +// For example, normalizing 2048 MiB might result in ~2.0 GiB. +// +// The function analyzes the average value and determines if a different SI prefix +// would make the number more human-readable (typically keeping values between 1 and 1000). +// +// Parameters: +// - avg: The metric value to normalize +// - p: The current SI prefix as a string (e.g., "K", "M", "G") +// +// Returns: +// - factor: The multiplicative factor to apply to convert the value +// - newPrefix: The new SI prefix string to use +// +// Example: +// +// factor, newPrefix := Normalize(2048.0, "M") // returns factor for MB->GB conversion, "G" func Normalize(avg float64, p string) (float64, string) { f, e := getNormalizationFactor(avg) diff --git a/internal/importer/normalize_test.go b/internal/importer/normalize_test.go index 72017f5..6aa1ed2 100644 --- a/internal/importer/normalize_test.go +++ b/internal/importer/normalize_test.go @@ -11,6 +11,8 @@ import ( ccunits "github.com/ClusterCockpit/cc-lib/ccUnits" ) +// TestNormalizeFactor tests the normalization of large byte values to gigabyte prefix. +// Verifies that values in the billions are correctly scaled to the "G" (giga) prefix. func TestNormalizeFactor(t *testing.T) { // var us string s := []float64{2890031237, 23998994567, 389734042344, 390349424345} @@ -38,6 +40,8 @@ func TestNormalizeFactor(t *testing.T) { } } +// TestNormalizeKeep tests that values already in an appropriate range maintain their prefix. +// Verifies that when values don't require rescaling, the original "G" prefix is preserved. func TestNormalizeKeep(t *testing.T) { s := []float64{3.0, 24.0, 390.0, 391.0} From 7da01975f72c2b295e46b4a3c9c281b7d5d572fc Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 4 Dec 2025 15:08:03 +0100 Subject: [PATCH 2/3] archive-migration: Add check for archive version and rewrite version after migration --- tools/archive-migration/main.go | 43 ++++++++++++++++++++++++++- tools/archive-migration/transforms.go | 20 ++++++------- 2 files changed, 52 insertions(+), 11 deletions(-) diff --git a/tools/archive-migration/main.go b/tools/archive-migration/main.go index edd48f4..9bbed12 100644 --- a/tools/archive-migration/main.go +++ b/tools/archive-migration/main.go @@ -5,9 +5,12 @@ package main import ( + "bufio" "flag" "fmt" "os" + "path/filepath" + "strings" cclog "github.com/ClusterCockpit/cc-lib/ccLogger" ) @@ -41,17 +44,33 @@ func main() { cclog.Fatalf("Archive path does not exist: %s", archivePath) } + // Check archive version + if err := checkVersion(archivePath); err != nil { + cclog.Fatalf("Version check failed: %v", err) + } + // Display warning for non-dry-run mode if !dryRun { cclog.Warn("WARNING: This will modify files in the archive!") cclog.Warn("It is strongly recommended to backup your archive first.") cclog.Warn("Run with --dry-run first to preview changes.") cclog.Info("") + + fmt.Print("Are you sure you want to continue? [y/N]: ") + reader := bufio.NewReader(os.Stdin) + input, err := reader.ReadString('\n') + if err != nil { + cclog.Fatalf("Error reading input: %v", err) + } + if strings.ToLower(strings.TrimSpace(input)) != "y" { + cclog.Info("Aborted by user.") + os.Exit(0) + } } // Run migration migrated, failed, err := migrateArchive(archivePath, dryRun, numWorkers) - + if err != nil { cclog.Errorf("Migration completed with errors: %s", err.Error()) if failed > 0 { @@ -62,6 +81,28 @@ func main() { if dryRun { cclog.Infof("Dry run completed: %d jobs would be migrated", migrated) } else { + if err := updateVersion(archivePath); err != nil { + cclog.Errorf("Failed to update archive version: %v", err) + os.Exit(1) + } cclog.Infof("Migration completed successfully: %d jobs migrated", migrated) } } + +func checkVersion(archivePath string) error { + versionFile := filepath.Join(archivePath, "version.txt") + data, err := os.ReadFile(versionFile) + if err != nil { + return fmt.Errorf("failed to read version.txt: %v", err) + } + versionStr := strings.TrimSpace(string(data)) + if versionStr != "2" { + return fmt.Errorf("archive version is %s, expected 2", versionStr) + } + return nil +} + +func updateVersion(archivePath string) error { + versionFile := filepath.Join(archivePath, "version.txt") + return os.WriteFile(versionFile, []byte("3\n"), 0644) +} diff --git a/tools/archive-migration/transforms.go b/tools/archive-migration/transforms.go index 5d8798f..6558e47 100644 --- a/tools/archive-migration/transforms.go +++ b/tools/archive-migration/transforms.go @@ -17,11 +17,11 @@ import ( // transformExclusiveToShared converts the old 'exclusive' field to the new 'shared' field // Mapping: 0 -> "multi_user", 1 -> "none", 2 -> "single_user" -func transformExclusiveToShared(jobData map[string]interface{}) error { +func transformExclusiveToShared(jobData map[string]any) error { // Check if 'exclusive' field exists if exclusive, ok := jobData["exclusive"]; ok { var exclusiveVal int - + // Handle both int and float64 (JSON unmarshaling can produce float64) switch v := exclusive.(type) { case float64: @@ -48,7 +48,7 @@ func transformExclusiveToShared(jobData map[string]interface{}) error { // Add shared field and remove exclusive jobData["shared"] = shared delete(jobData, "exclusive") - + cclog.Debugf("Transformed exclusive=%d to shared=%s", exclusiveVal, shared) } @@ -56,7 +56,7 @@ func transformExclusiveToShared(jobData map[string]interface{}) error { } // addMissingFields adds fields that are required in the current schema but might be missing in old archives -func addMissingFields(jobData map[string]interface{}) error { +func addMissingFields(jobData map[string]any) error { // Add submitTime if missing (default to startTime) if _, ok := jobData["submitTime"]; !ok { if startTime, ok := jobData["startTime"]; ok { @@ -85,7 +85,7 @@ func addMissingFields(jobData map[string]interface{}) error { } // removeDeprecatedFields removes fields that are no longer in the current schema -func removeDeprecatedFields(jobData map[string]interface{}) error { +func removeDeprecatedFields(jobData map[string]any) error { // List of deprecated fields to remove deprecatedFields := []string{ "mem_used_max", @@ -109,7 +109,7 @@ func removeDeprecatedFields(jobData map[string]interface{}) error { } // migrateJobMetadata applies all transformations to a job metadata map -func migrateJobMetadata(jobData map[string]interface{}) error { +func migrateJobMetadata(jobData map[string]any) error { // Apply transformations in order if err := transformExclusiveToShared(jobData); err != nil { return fmt.Errorf("transformExclusiveToShared failed: %w", err) @@ -135,7 +135,7 @@ func processJob(metaPath string, dryRun bool) error { } // Parse JSON - var jobData map[string]interface{} + var jobData map[string]any if err := json.Unmarshal(data, &jobData); err != nil { return fmt.Errorf("failed to parse JSON from %s: %w", metaPath, err) } @@ -157,7 +157,7 @@ func processJob(metaPath string, dryRun bool) error { return fmt.Errorf("failed to marshal migrated data: %w", err) } - if err := os.WriteFile(metaPath, migratedData, 0644); err != nil { + if err := os.WriteFile(metaPath, migratedData, 0o644); err != nil { return fmt.Errorf("failed to write %s: %w", metaPath, err) } @@ -175,11 +175,11 @@ func migrateArchive(archivePath string, dryRun bool, numWorkers int) (int, int, var failed int32 // Channel for job paths - jobs :=make(chan string, numWorkers*2) + jobs := make(chan string, numWorkers*2) var wg sync.WaitGroup // Start worker goroutines - for i := 0; i < numWorkers; i++ { + for i := range numWorkers { wg.Add(1) go func(workerID int) { defer wg.Done() From 03b5272e44518b83565dc13d81fbe39417a7e587 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 4 Dec 2025 15:08:22 +0100 Subject: [PATCH 3/3] Upgrade to latest cc-lib --- .gitignore | 1 + go.mod | 3 +-- go.sum | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 7dba944..ffac45b 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,4 @@ test_ccms_write_api.sh dist/ *.db .idea +tools/archive-migration/archive-migration diff --git a/go.mod b/go.mod index fa3fba2..20e4631 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ toolchain go1.24.1 require ( github.com/99designs/gqlgen v0.17.84 - github.com/ClusterCockpit/cc-lib v0.10.2 + github.com/ClusterCockpit/cc-lib v0.11.0 github.com/Masterminds/squirrel v1.5.4 github.com/aws/aws-sdk-go-v2 v1.39.6 github.com/aws/aws-sdk-go-v2/config v1.31.20 @@ -114,7 +114,6 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect - golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect golang.org/x/mod v0.30.0 // indirect golang.org/x/net v0.47.0 // indirect golang.org/x/sync v0.18.0 // indirect diff --git a/go.sum b/go.sum index 91fefb2..7a4831d 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25 github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8= github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU= -github.com/ClusterCockpit/cc-lib v0.10.2 h1:0sYW34EEfVLcdmHcZ1uErZepgwcu+HQg38WkBslQjqg= -github.com/ClusterCockpit/cc-lib v0.10.2/go.mod h1:0LKjDJs813/NMmaSJXJc11A9rxiFDPV/QdWQbZUp0XY= +github.com/ClusterCockpit/cc-lib v0.11.0 h1:66YkTOxWUak7nB3r7dJEm2q+B0uPRPGj0mwXZHXpOuA= +github.com/ClusterCockpit/cc-lib v0.11.0/go.mod h1:0LKjDJs813/NMmaSJXJc11A9rxiFDPV/QdWQbZUp0XY= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM=