diff --git a/internal/importer/README.md b/internal/importer/README.md new file mode 100644 index 0000000..75a8016 --- /dev/null +++ b/internal/importer/README.md @@ -0,0 +1,132 @@ +# Importer Package + +The `importer` package provides functionality for importing job data into the ClusterCockpit database from archived job files. + +## Overview + +This package supports two primary import workflows: + +1. **Bulk Database Initialization** - Reinitialize the entire job database from archived jobs +2. **Individual Job Import** - Import specific jobs from metadata/data file pairs + +Both workflows enrich job metadata by calculating performance footprints and energy consumption metrics before persisting to the database. + +## Main Entry Points + +### InitDB() + +Reinitializes the job database from all archived jobs. + +```go +if err := importer.InitDB(); err != nil { + log.Fatal(err) +} +``` + +This function: +- Flushes existing job, tag, and jobtag tables +- Iterates through all jobs in the configured archive +- Enriches each job with calculated metrics +- Inserts jobs into the database in batched transactions (100 jobs per batch) +- Continues on individual job failures, logging errors + +**Use Case**: Initial database setup or complete database rebuild from archive. + +### HandleImportFlag(flag string) + +Imports jobs from specified file pairs. + +```go +// Format: ":[,:,...]" +flag := "/path/to/meta.json:/path/to/data.json" +if err := importer.HandleImportFlag(flag); err != nil { + log.Fatal(err) +} +``` + +This function: +- Parses the comma-separated file pairs +- Validates metadata and job data against schemas (if validation enabled) +- Enriches each job with footprints and energy metrics +- Imports jobs into both the archive and database +- Fails fast on the first error + +**Use Case**: Importing specific jobs from external sources or manual job additions. + +## Job Enrichment + +Both import workflows use `enrichJobMetadata()` to calculate: + +### Performance Footprints + +Performance footprints are calculated from metric averages based on the subcluster configuration: + +```go +job.Footprint["mem_used_avg"] = 45.2 // GB +job.Footprint["cpu_load_avg"] = 0.87 // percentage +``` + +### Energy Metrics + +Energy consumption is calculated from power metrics using the formula: + +``` +Energy (kWh) = (Power (W) × Duration (s) / 3600) / 1000 +``` + +For each energy metric: +```go +job.EnergyFootprint["acc_power"] = 12.5 // kWh +job.Energy = 150.2 // Total energy in kWh +``` + +**Note**: Energy calculations for metrics with unit "energy" (Joules) are not yet implemented. + +## Data Validation + +### SanityChecks(job *schema.Job) + +Validates job metadata before database insertion: + +- Cluster exists in configuration +- Subcluster is valid (assigns if needed) +- Job state is valid +- Resources and user fields are populated +- Node counts and hardware thread counts are positive +- Resource count matches declared node count + +## Normalization Utilities + +The package includes utilities for normalizing metric values to appropriate SI prefixes: + +### Normalize(avg float64, prefix string) + +Adjusts values and SI prefixes for readability: + +```go +factor, newPrefix := importer.Normalize(2048.0, "M") +// Converts 2048 MB → ~2.0 GB +// Returns: factor for conversion, "G" +``` + +This is useful for automatically scaling metrics (e.g., memory, storage) to human-readable units. + +## Dependencies + +- `github.com/ClusterCockpit/cc-backend/internal/repository` - Database operations +- `github.com/ClusterCockpit/cc-backend/pkg/archive` - Job archive access +- `github.com/ClusterCockpit/cc-lib/schema` - Job schema definitions +- `github.com/ClusterCockpit/cc-lib/ccLogger` - Logging +- `github.com/ClusterCockpit/cc-lib/ccUnits` - SI unit handling + +## Error Handling + +- **InitDB**: Continues processing on individual job failures, logs errors, returns summary +- **HandleImportFlag**: Fails fast on first error, returns immediately +- Both functions log detailed error context for debugging + +## Performance + +- **Transaction Batching**: InitDB processes jobs in batches of 100 for optimal database performance +- **Tag Caching**: Tag IDs are cached during import to minimize database queries +- **Progress Reporting**: InitDB prints progress updates during bulk operations diff --git a/internal/importer/handleImport.go b/internal/importer/handleImport.go index f527781..482b328 100644 --- a/internal/importer/handleImport.go +++ b/internal/importer/handleImport.go @@ -8,7 +8,6 @@ import ( "bytes" "encoding/json" "fmt" - "math" "os" "strings" @@ -19,7 +18,22 @@ import ( "github.com/ClusterCockpit/cc-lib/schema" ) -// Import all jobs specified as `:,...` +// HandleImportFlag imports jobs from file pairs specified in a comma-separated flag string. +// +// The flag format is: ":[,:,...]" +// +// For each job pair, this function: +// 1. Reads and validates the metadata JSON file (schema.Job) +// 2. Reads and validates the job data JSON file (schema.JobData) +// 3. Enriches the job with calculated footprints and energy metrics +// 4. Validates the job using SanityChecks() +// 5. Imports the job into the archive +// 6. Inserts the job into the database with associated tags +// +// Schema validation is performed if config.Keys.Validate is true. +// +// Returns an error if file reading, validation, enrichment, or database operations fail. +// The function stops processing on the first error encountered. func HandleImportFlag(flag string) error { r := repository.GetJobRepository() @@ -72,75 +86,8 @@ func HandleImportFlag(flag string) error { job.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful - sc, err := archive.GetSubCluster(job.Cluster, job.SubCluster) - if err != nil { - cclog.Errorf("cannot get subcluster: %s", err.Error()) - return err - } - - job.Footprint = make(map[string]float64) - - for _, fp := range sc.Footprint { - statType := "avg" - - if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil { - statType = sc.MetricConfig[i].Footprint - } - - name := fmt.Sprintf("%s_%s", fp, statType) - - job.Footprint[name] = repository.LoadJobStat(&job, fp, statType) - } - - job.RawFootprint, err = json.Marshal(job.Footprint) - if err != nil { - cclog.Warn("Error while marshaling job footprint") - return err - } - - job.EnergyFootprint = make(map[string]float64) - - // Total Job Energy Outside Loop - totalEnergy := 0.0 - for _, fp := range sc.EnergyFootprint { - // Always Init Metric Energy Inside Loop - metricEnergy := 0.0 - if i, err := archive.MetricIndex(sc.MetricConfig, fp); err == nil { - // Note: For DB data, calculate and save as kWh - if sc.MetricConfig[i].Energy == "energy" { // this metric has energy as unit (Joules) - cclog.Warnf("Update EnergyFootprint for Job %d and Metric %s on cluster %s: Set to 'energy' in cluster.json: Not implemented, will return 0.0", job.JobID, job.Cluster, fp) - // FIXME: Needs sum as stats type - } else if sc.MetricConfig[i].Energy == "power" { // this metric has power as unit (Watt) - // Energy: Power (in Watts) * Time (in Seconds) - // Unit: (W * (s / 3600)) / 1000 = kWh - // Round 2 Digits: round(Energy * 100) / 100 - // Here: (All-Node Metric Average * Number of Nodes) * (Job Duration in Seconds / 3600) / 1000 - // Note: Shared Jobs handled correctly since "Node Average" is based on partial resources, while "numNodes" factor is 1 - rawEnergy := ((repository.LoadJobStat(&job, fp, "avg") * float64(job.NumNodes)) * (float64(job.Duration) / 3600.0)) / 1000.0 - metricEnergy = math.Round(rawEnergy*100.0) / 100.0 - } - } else { - cclog.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, job.ID) - } - - job.EnergyFootprint[fp] = metricEnergy - totalEnergy += metricEnergy - } - - job.Energy = (math.Round(totalEnergy*100.0) / 100.0) - if job.RawEnergyFootprint, err = json.Marshal(job.EnergyFootprint); err != nil { - cclog.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", job.ID) - return err - } - - job.RawResources, err = json.Marshal(job.Resources) - if err != nil { - cclog.Warn("Error while marshaling job resources") - return err - } - job.RawMetaData, err = json.Marshal(job.MetaData) - if err != nil { - cclog.Warn("Error while marshaling job metadata") + if err = enrichJobMetadata(&job); err != nil { + cclog.Errorf("Error enriching job metadata: %v", err) return err } diff --git a/internal/importer/importer_test.go b/internal/importer/importer_test.go index d6bfd94..2aa007d 100644 --- a/internal/importer/importer_test.go +++ b/internal/importer/importer_test.go @@ -20,6 +20,8 @@ import ( cclog "github.com/ClusterCockpit/cc-lib/ccLogger" ) +// copyFile copies a file from source path to destination path. +// Used by tests to set up test fixtures. func copyFile(s string, d string) error { r, err := os.Open(s) if err != nil { @@ -35,6 +37,14 @@ func copyFile(s string, d string) error { return nil } +// setup initializes a test environment for importer tests. +// +// Creates a temporary directory with: +// - A test job archive with cluster configuration +// - A SQLite database initialized with schema +// - Configuration files loaded +// +// Returns a JobRepository instance for test assertions. func setup(t *testing.T) *repository.JobRepository { const testconfig = `{ "main": { @@ -130,6 +140,7 @@ func setup(t *testing.T) *repository.JobRepository { return repository.GetJobRepository() } +// Result represents the expected test result for job import verification. type Result struct { JobId int64 Cluster string @@ -137,6 +148,8 @@ type Result struct { Duration int32 } +// readResult reads the expected test result from a golden file. +// Golden files contain the expected job attributes after import. func readResult(t *testing.T, testname string) Result { var r Result @@ -154,6 +167,13 @@ func readResult(t *testing.T, testname string) Result { return r } +// TestHandleImportFlag tests the HandleImportFlag function with various job import scenarios. +// +// The test uses golden files in testdata/ to verify that jobs are correctly: +// - Parsed from metadata and data JSON files +// - Enriched with footprints and energy metrics +// - Inserted into the database +// - Retrievable with correct attributes func TestHandleImportFlag(t *testing.T) { r := setup(t) diff --git a/internal/importer/initDB.go b/internal/importer/initDB.go index b81a464..e061734 100644 --- a/internal/importer/initDB.go +++ b/internal/importer/initDB.go @@ -2,6 +2,15 @@ // All rights reserved. This file is part of cc-backend. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. + +// Package importer provides functionality for importing job data into the ClusterCockpit database. +// +// The package supports two primary use cases: +// 1. Bulk database initialization from archived jobs via InitDB() +// 2. Individual job import from file pairs via HandleImportFlag() +// +// Both operations enrich job metadata by calculating footprints and energy metrics +// before persisting to the database. package importer import ( @@ -22,8 +31,21 @@ const ( setTagQuery = "INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)" ) -// Delete the tables "job", "tag" and "jobtag" from the database and -// repopulate them using the jobs found in `archive`. +// InitDB reinitializes the job database from archived job data. +// +// This function performs the following operations: +// 1. Flushes existing job, tag, and jobtag tables +// 2. Iterates through all jobs in the archive +// 3. Enriches each job with calculated footprints and energy metrics +// 4. Inserts jobs and tags into the database in batched transactions +// +// Jobs are processed in batches of 100 for optimal performance. The function +// continues processing even if individual jobs fail, logging errors and +// returning a summary at the end. +// +// Returns an error if database initialization, transaction management, or +// critical operations fail. Individual job failures are logged but do not +// stop the overall import process. func InitDB() error { r := repository.GetJobRepository() if err := r.Flush(); err != nil { @@ -72,76 +94,7 @@ func InitDB() error { jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful - sc, err := archive.GetSubCluster(jobMeta.Cluster, jobMeta.SubCluster) - if err != nil { - cclog.Errorf("cannot get subcluster: %s", err.Error()) - return err - } - - jobMeta.Footprint = make(map[string]float64) - - for _, fp := range sc.Footprint { - statType := "avg" - - if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil { - statType = sc.MetricConfig[i].Footprint - } - - name := fmt.Sprintf("%s_%s", fp, statType) - - jobMeta.Footprint[name] = repository.LoadJobStat(jobMeta, fp, statType) - } - - jobMeta.RawFootprint, err = json.Marshal(jobMeta.Footprint) - if err != nil { - cclog.Warn("Error while marshaling job footprint") - return err - } - - jobMeta.EnergyFootprint = make(map[string]float64) - - // Total Job Energy Outside Loop - totalEnergy := 0.0 - for _, fp := range sc.EnergyFootprint { - // Always Init Metric Energy Inside Loop - metricEnergy := 0.0 - if i, err := archive.MetricIndex(sc.MetricConfig, fp); err == nil { - // Note: For DB data, calculate and save as kWh - if sc.MetricConfig[i].Energy == "energy" { // this metric has energy as unit (Joules) - cclog.Warnf("Update EnergyFootprint for Job %d and Metric %s on cluster %s: Set to 'energy' in cluster.json: Not implemented, will return 0.0", jobMeta.JobID, jobMeta.Cluster, fp) - // FIXME: Needs sum as stats type - } else if sc.MetricConfig[i].Energy == "power" { // this metric has power as unit (Watt) - // Energy: Power (in Watts) * Time (in Seconds) - // Unit: (W * (s / 3600)) / 1000 = kWh - // Round 2 Digits: round(Energy * 100) / 100 - // Here: (All-Node Metric Average * Number of Nodes) * (Job Duration in Seconds / 3600) / 1000 - // Note: Shared Jobs handled correctly since "Node Average" is based on partial resources, while "numNodes" factor is 1 - rawEnergy := ((repository.LoadJobStat(jobMeta, fp, "avg") * float64(jobMeta.NumNodes)) * (float64(jobMeta.Duration) / 3600.0)) / 1000.0 - metricEnergy = math.Round(rawEnergy*100.0) / 100.0 - } - } else { - cclog.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, jobMeta.ID) - } - - jobMeta.EnergyFootprint[fp] = metricEnergy - totalEnergy += metricEnergy - } - - jobMeta.Energy = (math.Round(totalEnergy*100.0) / 100.0) - if jobMeta.RawEnergyFootprint, err = json.Marshal(jobMeta.EnergyFootprint); err != nil { - cclog.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", jobMeta.ID) - return err - } - - jobMeta.RawResources, err = json.Marshal(jobMeta.Resources) - if err != nil { - cclog.Errorf("repository initDB(): %v", err) - errorOccured++ - continue - } - - jobMeta.RawMetaData, err = json.Marshal(jobMeta.MetaData) - if err != nil { + if err := enrichJobMetadata(jobMeta); err != nil { cclog.Errorf("repository initDB(): %v", err) errorOccured++ continue @@ -163,9 +116,9 @@ func InitDB() error { for _, tag := range jobMeta.Tags { tagstr := tag.Name + ":" + tag.Type - tagId, ok := tags[tagstr] + tagID, ok := tags[tagstr] if !ok { - tagId, err = r.TransactionAdd(t, + tagID, err = r.TransactionAdd(t, addTagQuery, tag.Name, tag.Type) if err != nil { @@ -173,12 +126,12 @@ func InitDB() error { errorOccured++ continue } - tags[tagstr] = tagId + tags[tagstr] = tagID } r.TransactionAdd(t, setTagQuery, - id, tagId) + id, tagID) } if err == nil { @@ -195,7 +148,110 @@ func InitDB() error { return nil } -// This function also sets the subcluster if necessary! +// enrichJobMetadata calculates and populates job footprints, energy metrics, and serialized fields. +// +// This function performs the following enrichment operations: +// 1. Calculates job footprint metrics based on the subcluster configuration +// 2. Computes energy footprint and total energy consumption in kWh +// 3. Marshals footprints, resources, and metadata into JSON for database storage +// +// The function expects the job's MonitoringStatus and SubCluster to be already set. +// Energy calculations convert power metrics (Watts) to energy (kWh) using the formula: +// +// Energy (kWh) = (Power (W) * Duration (s) / 3600) / 1000 +// +// Returns an error if subcluster retrieval, metric indexing, or JSON marshaling fails. +func enrichJobMetadata(job *schema.Job) error { + sc, err := archive.GetSubCluster(job.Cluster, job.SubCluster) + if err != nil { + cclog.Errorf("cannot get subcluster: %s", err.Error()) + return err + } + + job.Footprint = make(map[string]float64) + + for _, fp := range sc.Footprint { + statType := "avg" + + if i, err := archive.MetricIndex(sc.MetricConfig, fp); err != nil { + statType = sc.MetricConfig[i].Footprint + } + + name := fmt.Sprintf("%s_%s", fp, statType) + + job.Footprint[name] = repository.LoadJobStat(job, fp, statType) + } + + job.RawFootprint, err = json.Marshal(job.Footprint) + if err != nil { + cclog.Warn("Error while marshaling job footprint") + return err + } + + job.EnergyFootprint = make(map[string]float64) + + // Total Job Energy Outside Loop + totalEnergy := 0.0 + for _, fp := range sc.EnergyFootprint { + // Always Init Metric Energy Inside Loop + metricEnergy := 0.0 + if i, err := archive.MetricIndex(sc.MetricConfig, fp); err == nil { + // Note: For DB data, calculate and save as kWh + switch sc.MetricConfig[i].Energy { + case "energy": // this metric has energy as unit (Joules) + cclog.Warnf("Update EnergyFootprint for Job %d and Metric %s on cluster %s: Set to 'energy' in cluster.json: Not implemented, will return 0.0", job.JobID, job.Cluster, fp) + // FIXME: Needs sum as stats type + case "power": // this metric has power as unit (Watt) + // Energy: Power (in Watts) * Time (in Seconds) + // Unit: (W * (s / 3600)) / 1000 = kWh + // Round 2 Digits: round(Energy * 100) / 100 + // Here: (All-Node Metric Average * Number of Nodes) * (Job Duration in Seconds / 3600) / 1000 + // Note: Shared Jobs handled correctly since "Node Average" is based on partial resources, while "numNodes" factor is 1 + rawEnergy := ((repository.LoadJobStat(job, fp, "avg") * float64(job.NumNodes)) * (float64(job.Duration) / 3600.0)) / 1000.0 + metricEnergy = math.Round(rawEnergy*100.0) / 100.0 + } + } else { + cclog.Warnf("Error while collecting energy metric %s for job, DB ID '%v', return '0.0'", fp, job.ID) + } + + job.EnergyFootprint[fp] = metricEnergy + totalEnergy += metricEnergy + } + + job.Energy = (math.Round(totalEnergy*100.0) / 100.0) + if job.RawEnergyFootprint, err = json.Marshal(job.EnergyFootprint); err != nil { + cclog.Warnf("Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'", job.ID) + return err + } + + job.RawResources, err = json.Marshal(job.Resources) + if err != nil { + cclog.Warn("Error while marshaling job resources") + return err + } + + job.RawMetaData, err = json.Marshal(job.MetaData) + if err != nil { + cclog.Warn("Error while marshaling job metadata") + return err + } + + return nil +} + +// SanityChecks validates job metadata and ensures cluster/subcluster configuration is valid. +// +// This function performs the following validations: +// 1. Verifies the cluster exists in the archive configuration +// 2. Assigns and validates the subcluster (may modify job.SubCluster) +// 3. Validates job state is a recognized value +// 4. Ensures resources and user fields are populated +// 5. Validates node counts and hardware thread counts are positive +// 6. Verifies the number of resources matches the declared node count +// +// The function may modify the job's SubCluster field if it needs to be assigned. +// +// Returns an error if any validation check fails. func SanityChecks(job *schema.Job) error { if c := archive.GetCluster(job.Cluster); c == nil { return fmt.Errorf("no such cluster: %v", job.Cluster) @@ -220,6 +276,14 @@ func SanityChecks(job *schema.Job) error { return nil } +// checkJobData normalizes metric units in job data based on average values. +// +// NOTE: This function is currently unused and contains incomplete implementation. +// It was intended to normalize byte and file-related metrics to appropriate SI prefixes, +// but the normalization logic is commented out. Consider removing or completing this +// function based on project requirements. +// +// TODO: Either implement the metric normalization or remove this dead code. func checkJobData(d *schema.JobData) error { for _, scopes := range *d { // var newUnit schema.Unit diff --git a/internal/importer/normalize.go b/internal/importer/normalize.go index bc72cb3..fc5e537 100644 --- a/internal/importer/normalize.go +++ b/internal/importer/normalize.go @@ -10,6 +10,15 @@ import ( ccunits "github.com/ClusterCockpit/cc-lib/ccUnits" ) +// getNormalizationFactor calculates the scaling factor needed to normalize a value +// to a more readable range (typically between 1.0 and 1000.0). +// +// For values greater than 1000, the function scales down by factors of 1000 (returns negative exponent). +// For values less than 1.0, the function scales up by factors of 1000 (returns positive exponent). +// +// Returns: +// - factor: The multiplicative factor to apply (10^(count*scale)) +// - exponent: The power of 10 representing the adjustment (multiple of 3 for SI prefixes) func getNormalizationFactor(v float64) (float64, int) { count := 0 scale := -3 @@ -29,6 +38,14 @@ func getNormalizationFactor(v float64) (float64, int) { return math.Pow10(count * scale), count * scale } +// getExponent calculates the SI prefix exponent from a numeric prefix value. +// +// For example: +// - Input: 1000.0 (kilo) returns 3 +// - Input: 1000000.0 (mega) returns 6 +// - Input: 1000000000.0 (giga) returns 9 +// +// Returns the exponent representing the power of 10 for the SI prefix. func getExponent(p float64) int { count := 0 @@ -40,12 +57,42 @@ func getExponent(p float64) int { return count * 3 } +// newPrefixFromFactor computes a new SI unit prefix after applying a normalization factor. +// +// Given an original prefix and an exponent adjustment, this function calculates +// the resulting SI prefix. For example, if normalizing from bytes (no prefix) by +// a factor of 10^9, the result would be the "G" (giga) prefix. +// +// Parameters: +// - op: The original SI prefix value +// - e: The exponent adjustment to apply +// +// Returns the new SI prefix after adjustment. func newPrefixFromFactor(op ccunits.Prefix, e int) ccunits.Prefix { f := float64(op) exp := math.Pow10(getExponent(f) - e) return ccunits.Prefix(exp) } +// Normalize adjusts a metric value and its SI unit prefix to a more readable range. +// +// This function is useful for automatically scaling metrics to appropriate units. +// For example, normalizing 2048 MiB might result in ~2.0 GiB. +// +// The function analyzes the average value and determines if a different SI prefix +// would make the number more human-readable (typically keeping values between 1 and 1000). +// +// Parameters: +// - avg: The metric value to normalize +// - p: The current SI prefix as a string (e.g., "K", "M", "G") +// +// Returns: +// - factor: The multiplicative factor to apply to convert the value +// - newPrefix: The new SI prefix string to use +// +// Example: +// +// factor, newPrefix := Normalize(2048.0, "M") // returns factor for MB->GB conversion, "G" func Normalize(avg float64, p string) (float64, string) { f, e := getNormalizationFactor(avg) diff --git a/internal/importer/normalize_test.go b/internal/importer/normalize_test.go index 72017f5..6aa1ed2 100644 --- a/internal/importer/normalize_test.go +++ b/internal/importer/normalize_test.go @@ -11,6 +11,8 @@ import ( ccunits "github.com/ClusterCockpit/cc-lib/ccUnits" ) +// TestNormalizeFactor tests the normalization of large byte values to gigabyte prefix. +// Verifies that values in the billions are correctly scaled to the "G" (giga) prefix. func TestNormalizeFactor(t *testing.T) { // var us string s := []float64{2890031237, 23998994567, 389734042344, 390349424345} @@ -38,6 +40,8 @@ func TestNormalizeFactor(t *testing.T) { } } +// TestNormalizeKeep tests that values already in an appropriate range maintain their prefix. +// Verifies that when values don't require rescaling, the original "G" prefix is preserved. func TestNormalizeKeep(t *testing.T) { s := []float64{3.0, 24.0, 390.0, 391.0}