2024-04-11 23:04:30 +02:00
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
2023-04-28 08:49:58 +02:00
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package importer
import (
"bytes"
"encoding/json"
"fmt"
2024-11-16 06:36:55 +01:00
"math"
2023-04-28 08:49:58 +02:00
"os"
"strings"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
// Import all jobs specified as `<path-to-meta.json>:<path-to-data.json>,...`
func HandleImportFlag ( flag string ) error {
r := repository . GetJobRepository ( )
for _ , pair := range strings . Split ( flag , "," ) {
files := strings . Split ( pair , ":" )
if len ( files ) != 2 {
return fmt . Errorf ( "REPOSITORY/INIT > invalid import flag format" )
}
raw , err := os . ReadFile ( files [ 0 ] )
if err != nil {
log . Warn ( "Error while reading metadata file for import" )
return err
}
if config . Keys . Validate {
2023-05-04 07:00:30 +02:00
if err = schema . Validate ( schema . Meta , bytes . NewReader ( raw ) ) ; err != nil {
2023-04-28 08:49:58 +02:00
return fmt . Errorf ( "REPOSITORY/INIT > validate job meta: %v" , err )
}
}
dec := json . NewDecoder ( bytes . NewReader ( raw ) )
dec . DisallowUnknownFields ( )
2024-07-04 15:05:24 +02:00
job := schema . JobMeta { BaseJob : schema . JobDefaults }
if err = dec . Decode ( & job ) ; err != nil {
2023-04-28 08:49:58 +02:00
log . Warn ( "Error while decoding raw json metadata for import" )
return err
}
raw , err = os . ReadFile ( files [ 1 ] )
if err != nil {
log . Warn ( "Error while reading jobdata file for import" )
return err
}
if config . Keys . Validate {
2023-05-04 07:00:30 +02:00
if err = schema . Validate ( schema . Data , bytes . NewReader ( raw ) ) ; err != nil {
2023-04-28 08:49:58 +02:00
return fmt . Errorf ( "REPOSITORY/INIT > validate job data: %v" , err )
}
}
dec = json . NewDecoder ( bytes . NewReader ( raw ) )
dec . DisallowUnknownFields ( )
jobData := schema . JobData { }
2023-05-04 07:00:30 +02:00
if err = dec . Decode ( & jobData ) ; err != nil {
2023-04-28 08:49:58 +02:00
log . Warn ( "Error while decoding raw json jobdata for import" )
return err
}
2024-07-04 15:05:24 +02:00
job . MonitoringStatus = schema . MonitoringStatusArchivingSuccessful
2023-04-28 08:49:58 +02:00
2024-07-04 15:05:24 +02:00
sc , err := archive . GetSubCluster ( job . Cluster , job . SubCluster )
2024-07-04 14:14:27 +02:00
if err != nil {
log . Errorf ( "cannot get subcluster: %s" , err . Error ( ) )
return err
}
job . Footprint = make ( map [ string ] float64 )
for _ , fp := range sc . Footprint {
2024-08-30 13:50:49 +02:00
statType := "avg"
if i , err := archive . MetricIndex ( sc . MetricConfig , fp ) ; err != nil {
statType = sc . MetricConfig [ i ] . Footprint
}
name := fmt . Sprintf ( "%s_%s" , fp , statType )
2024-11-16 06:36:55 +01:00
job . Footprint [ name ] = repository . LoadJobStat ( & job , fp , statType )
2024-07-04 14:14:27 +02:00
}
2024-08-30 13:50:49 +02:00
2024-07-04 14:14:27 +02:00
job . RawFootprint , err = json . Marshal ( job . Footprint )
if err != nil {
log . Warn ( "Error while marshaling job footprint" )
return err
}
2024-11-16 06:36:55 +01:00
job . EnergyFootprint = make ( map [ string ] float64 )
2025-03-06 12:46:25 +01:00
// Total Job Energy Outside Loop
totalEnergy := 0.0
2024-11-16 06:36:55 +01:00
for _ , fp := range sc . EnergyFootprint {
2025-03-06 12:46:25 +01:00
// Always Init Metric Energy Inside Loop
metricEnergy := 0.0
2024-11-16 06:36:55 +01:00
if i , err := archive . MetricIndex ( sc . MetricConfig , fp ) ; err == nil {
// Note: For DB data, calculate and save as kWh
if sc . MetricConfig [ i ] . Energy == "energy" { // this metric has energy as unit (Joules)
2025-03-06 12:46:25 +01:00
log . Warnf ( "Update EnergyFootprint for Job %d and Metric %s on cluster %s: Set to 'energy' in cluster.json: Not implemented, will return 0.0" , job . JobID , job . Cluster , fp )
// FIXME: Needs sum as stats type
2024-11-16 06:36:55 +01:00
} else if sc . MetricConfig [ i ] . Energy == "power" { // this metric has power as unit (Watt)
2025-03-06 12:46:25 +01:00
// Energy: Power (in Watts) * Time (in Seconds)
// Unit: (W * (s / 3600)) / 1000 = kWh
// Round 2 Digits: round(Energy * 100) / 100
// Here: (All-Node Metric Average * Number of Nodes) * (Job Duration in Seconds / 3600) / 1000
// Note: Shared Jobs handled correctly since "Node Average" is based on partial resources, while "numNodes" factor is 1
rawEnergy := ( ( repository . LoadJobStat ( & job , fp , "avg" ) * float64 ( job . NumNodes ) ) * ( float64 ( job . Duration ) / 3600.0 ) ) / 1000.0
metricEnergy = math . Round ( rawEnergy * 100.0 ) / 100.0
2024-11-16 06:36:55 +01:00
}
} else {
log . Warnf ( "Error while collecting energy metric %s for job, DB ID '%v', return '0.0'" , fp , job . ID )
}
2025-03-06 12:46:25 +01:00
job . EnergyFootprint [ fp ] = metricEnergy
totalEnergy += metricEnergy
2024-11-16 06:36:55 +01:00
}
2025-03-06 12:46:25 +01:00
job . Energy = ( math . Round ( totalEnergy * 100.0 ) / 100.0 )
2024-11-16 06:36:55 +01:00
if job . RawEnergyFootprint , err = json . Marshal ( job . EnergyFootprint ) ; err != nil {
log . Warnf ( "Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'" , job . ID )
return err
}
2023-04-28 08:49:58 +02:00
job . RawResources , err = json . Marshal ( job . Resources )
if err != nil {
log . Warn ( "Error while marshaling job resources" )
return err
}
job . RawMetaData , err = json . Marshal ( job . MetaData )
if err != nil {
log . Warn ( "Error while marshaling job metadata" )
return err
}
2023-05-04 07:00:30 +02:00
if err = SanityChecks ( & job . BaseJob ) ; err != nil {
2023-04-28 08:49:58 +02:00
log . Warn ( "BaseJob SanityChecks failed" )
return err
}
2024-07-04 15:05:24 +02:00
if err = archive . GetHandle ( ) . ImportJob ( & job , & jobData ) ; err != nil {
2023-04-28 08:49:58 +02:00
log . Error ( "Error while importing job" )
return err
}
id , err := r . InsertJob ( & job )
if err != nil {
log . Warn ( "Error while job db insert" )
return err
}
for _ , tag := range job . Tags {
2024-08-01 18:59:24 +02:00
if err := r . ImportTag ( id , tag . Type , tag . Name , tag . Scope ) ; err != nil {
log . Error ( "Error while adding or creating tag on import" )
2023-04-28 08:49:58 +02:00
return err
}
}
log . Infof ( "successfully imported a new job (jobId: %d, cluster: %s, dbid: %d)" , job . JobID , job . Cluster , id )
}
return nil
}