2024-04-11 23:04:30 +02:00
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
2023-04-28 08:49:58 +02:00
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package importer
import (
"encoding/json"
"fmt"
2024-11-16 06:36:55 +01:00
"math"
2023-04-28 08:49:58 +02:00
"strings"
"time"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
)
2024-09-03 15:40:02 +02:00
const (
addTagQuery = "INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)"
setTagQuery = "INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)"
)
2023-04-28 08:49:58 +02:00
// Delete the tables "job", "tag" and "jobtag" from the database and
// repopulate them using the jobs found in `archive`.
func InitDB ( ) error {
r := repository . GetJobRepository ( )
2023-05-04 16:03:04 +02:00
if err := r . Flush ( ) ; err != nil {
log . Errorf ( "repository initDB(): %v" , err )
return err
}
2023-04-28 08:49:58 +02:00
starttime := time . Now ( )
log . Print ( "Building job table..." )
2024-09-03 15:40:02 +02:00
t , err := r . TransactionInit ( )
2023-04-28 08:49:58 +02:00
if err != nil {
log . Warn ( "Error while initializing SQL transactions" )
return err
}
tags := make ( map [ string ] int64 )
// Not using log.Print because we want the line to end with `\r` and
// this function is only ever called when a special command line flag
// is passed anyways.
fmt . Printf ( "%d jobs inserted...\r" , 0 )
ar := archive . GetHandle ( )
i := 0
errorOccured := 0
for jobContainer := range ar . Iter ( false ) {
jobMeta := jobContainer . Meta
// Bundle 100 inserts into one transaction for better performance
if i % 100 == 0 {
r . TransactionCommit ( t )
fmt . Printf ( "%d jobs inserted...\r" , i )
}
jobMeta . MonitoringStatus = schema . MonitoringStatusArchivingSuccessful
job := schema . Job {
BaseJob : jobMeta . BaseJob ,
StartTime : time . Unix ( jobMeta . StartTime , 0 ) ,
StartTimeUnix : jobMeta . StartTime ,
}
2024-07-04 14:14:27 +02:00
sc , err := archive . GetSubCluster ( jobMeta . Cluster , jobMeta . SubCluster )
if err != nil {
log . Errorf ( "cannot get subcluster: %s" , err . Error ( ) )
return err
}
2024-11-16 06:36:55 +01:00
2024-07-04 14:14:27 +02:00
job . Footprint = make ( map [ string ] float64 )
for _ , fp := range sc . Footprint {
2024-08-30 13:50:49 +02:00
statType := "avg"
if i , err := archive . MetricIndex ( sc . MetricConfig , fp ) ; err != nil {
statType = sc . MetricConfig [ i ] . Footprint
}
name := fmt . Sprintf ( "%s_%s" , fp , statType )
2024-11-14 19:13:07 +01:00
job . Footprint [ name ] = repository . LoadJobStat ( jobMeta , fp , statType )
2024-07-04 14:14:27 +02:00
}
job . RawFootprint , err = json . Marshal ( job . Footprint )
if err != nil {
log . Warn ( "Error while marshaling job footprint" )
return err
}
2023-04-28 08:49:58 +02:00
2024-11-16 06:36:55 +01:00
job . EnergyFootprint = make ( map [ string ] float64 )
2025-03-06 12:46:25 +01:00
// Total Job Energy Outside Loop
totalEnergy := 0.0
2024-11-16 06:36:55 +01:00
for _ , fp := range sc . EnergyFootprint {
2025-03-06 12:46:25 +01:00
// Always Init Metric Energy Inside Loop
metricEnergy := 0.0
2024-11-16 06:36:55 +01:00
if i , err := archive . MetricIndex ( sc . MetricConfig , fp ) ; err == nil {
// Note: For DB data, calculate and save as kWh
if sc . MetricConfig [ i ] . Energy == "energy" { // this metric has energy as unit (Joules)
2025-03-06 12:46:25 +01:00
log . Warnf ( "Update EnergyFootprint for Job %d and Metric %s on cluster %s: Set to 'energy' in cluster.json: Not implemented, will return 0.0" , jobMeta . JobID , jobMeta . Cluster , fp )
// FIXME: Needs sum as stats type
2024-11-16 06:36:55 +01:00
} else if sc . MetricConfig [ i ] . Energy == "power" { // this metric has power as unit (Watt)
2025-03-06 12:46:25 +01:00
// Energy: Power (in Watts) * Time (in Seconds)
// Unit: (W * (s / 3600)) / 1000 = kWh
// Round 2 Digits: round(Energy * 100) / 100
// Here: (All-Node Metric Average * Number of Nodes) * (Job Duration in Seconds / 3600) / 1000
// Note: Shared Jobs handled correctly since "Node Average" is based on partial resources, while "numNodes" factor is 1
rawEnergy := ( ( repository . LoadJobStat ( jobMeta , fp , "avg" ) * float64 ( jobMeta . NumNodes ) ) * ( float64 ( jobMeta . Duration ) / 3600.0 ) ) / 1000.0
metricEnergy = math . Round ( rawEnergy * 100.0 ) / 100.0
2024-11-16 06:36:55 +01:00
}
} else {
log . Warnf ( "Error while collecting energy metric %s for job, DB ID '%v', return '0.0'" , fp , jobMeta . ID )
}
2025-03-06 12:46:25 +01:00
job . EnergyFootprint [ fp ] = metricEnergy
totalEnergy += metricEnergy
2024-11-16 06:36:55 +01:00
}
2025-03-06 12:46:25 +01:00
job . Energy = ( math . Round ( totalEnergy * 100.0 ) / 100.0 )
2024-11-16 06:36:55 +01:00
if job . RawEnergyFootprint , err = json . Marshal ( job . EnergyFootprint ) ; err != nil {
log . Warnf ( "Error while marshaling energy footprint for job INTO BYTES, DB ID '%v'" , jobMeta . ID )
return err
}
2023-04-28 08:49:58 +02:00
job . RawResources , err = json . Marshal ( job . Resources )
if err != nil {
log . Errorf ( "repository initDB(): %v" , err )
errorOccured ++
continue
}
job . RawMetaData , err = json . Marshal ( job . MetaData )
if err != nil {
log . Errorf ( "repository initDB(): %v" , err )
errorOccured ++
continue
}
if err := SanityChecks ( & job . BaseJob ) ; err != nil {
log . Errorf ( "repository initDB(): %v" , err )
errorOccured ++
continue
}
2024-09-03 15:40:02 +02:00
id , err := r . TransactionAddNamed ( t ,
repository . NamedJobInsert , job )
2023-04-28 08:49:58 +02:00
if err != nil {
log . Errorf ( "repository initDB(): %v" , err )
errorOccured ++
continue
}
for _ , tag := range job . Tags {
tagstr := tag . Name + ":" + tag . Type
tagId , ok := tags [ tagstr ]
if ! ok {
2024-09-03 15:40:02 +02:00
tagId , err = r . TransactionAdd ( t ,
addTagQuery ,
tag . Name , tag . Type )
2023-04-28 08:49:58 +02:00
if err != nil {
log . Errorf ( "Error adding tag: %v" , err )
errorOccured ++
continue
}
tags [ tagstr ] = tagId
}
2024-09-03 15:40:02 +02:00
r . TransactionAdd ( t ,
setTagQuery ,
id , tagId )
2023-04-28 08:49:58 +02:00
}
if err == nil {
i += 1
}
}
if errorOccured > 0 {
log . Warnf ( "Error in import of %d jobs!" , errorOccured )
}
r . TransactionEnd ( t )
log . Printf ( "A total of %d jobs have been registered in %.3f seconds.\n" , i , time . Since ( starttime ) . Seconds ( ) )
return nil
}
// This function also sets the subcluster if necessary!
func SanityChecks ( job * schema . BaseJob ) error {
if c := archive . GetCluster ( job . Cluster ) ; c == nil {
return fmt . Errorf ( "no such cluster: %v" , job . Cluster )
}
if err := archive . AssignSubCluster ( job ) ; err != nil {
log . Warn ( "Error while assigning subcluster to job" )
return err
}
if ! job . State . Valid ( ) {
return fmt . Errorf ( "not a valid job state: %v" , job . State )
}
if len ( job . Resources ) == 0 || len ( job . User ) == 0 {
return fmt . Errorf ( "'resources' and 'user' should not be empty" )
}
2023-05-04 07:00:30 +02:00
if job . NumAcc < 0 || job . NumHWThreads < 0 || job . NumNodes < 1 {
2023-04-28 08:49:58 +02:00
return fmt . Errorf ( "'numNodes', 'numAcc' or 'numHWThreads' invalid" )
}
if len ( job . Resources ) != int ( job . NumNodes ) {
return fmt . Errorf ( "len(resources) does not equal numNodes (%d vs %d)" , len ( job . Resources ) , job . NumNodes )
}
return nil
}
func checkJobData ( d * schema . JobData ) error {
for _ , scopes := range * d {
// var newUnit schema.Unit
// TODO Add node scope if missing
for _ , metric := range scopes {
if strings . Contains ( metric . Unit . Base , "B/s" ) ||
strings . Contains ( metric . Unit . Base , "F/s" ) ||
strings . Contains ( metric . Unit . Base , "B" ) {
// get overall avg
sum := 0.0
for _ , s := range metric . Series {
sum += s . Statistics . Avg
}
avg := sum / float64 ( len ( metric . Series ) )
2023-05-04 16:03:04 +02:00
f , p := Normalize ( avg , metric . Unit . Prefix )
2023-04-28 08:49:58 +02:00
if p != metric . Unit . Prefix {
fmt . Printf ( "Convert %e" , f )
// for _, s := range metric.Series {
// fp := schema.ConvertFloatToFloat64(s.Data)
//
// for i := 0; i < len(fp); i++ {
// fp[i] *= f
// fp[i] = math.Ceil(fp[i])
// }
//
// s.Data = schema.GetFloat64ToFloat(fp)
// }
metric . Unit . Prefix = p
}
}
}
}
return nil
}