mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-07-23 04:51:39 +02:00
Introduce import job flag
This commit is contained in:
@@ -5,10 +5,15 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/log"
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/schema"
|
||||
@@ -85,6 +90,107 @@ const NamedJobInsert string = `INSERT INTO job (
|
||||
:mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total
|
||||
);`
|
||||
|
||||
// Import all jobs specified as `<path-to-meta.json>:<path-to-data.json>,...`
|
||||
func HandleImportFlag(flag string) error {
|
||||
for _, pair := range strings.Split(flag, ",") {
|
||||
files := strings.Split(pair, ":")
|
||||
if len(files) != 2 {
|
||||
return fmt.Errorf("invalid import flag format")
|
||||
}
|
||||
|
||||
raw, err := os.ReadFile(files[0])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if config.Keys.Validate {
|
||||
if err := schema.Validate(schema.Meta, bytes.NewReader(raw)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
dec := json.NewDecoder(bytes.NewReader(raw))
|
||||
dec.DisallowUnknownFields()
|
||||
jobMeta := schema.JobMeta{BaseJob: schema.JobDefaults}
|
||||
if err := dec.Decode(&jobMeta); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
raw, err = os.ReadFile(files[1])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if config.Keys.Validate {
|
||||
if err := schema.Validate(schema.Data, bytes.NewReader(raw)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
dec = json.NewDecoder(bytes.NewReader(raw))
|
||||
dec.DisallowUnknownFields()
|
||||
jobData := schema.JobData{}
|
||||
if err := dec.Decode(&jobData); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
SanityChecks(&jobMeta.BaseJob)
|
||||
jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful
|
||||
if job, err := GetJobRepository().Find(&jobMeta.JobID, &jobMeta.Cluster, &jobMeta.StartTime); err != sql.ErrNoRows {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return fmt.Errorf("a job with that jobId, cluster and startTime does already exist (dbid: %d)", job.ID)
|
||||
}
|
||||
|
||||
job := schema.Job{
|
||||
BaseJob: jobMeta.BaseJob,
|
||||
StartTime: time.Unix(jobMeta.StartTime, 0),
|
||||
StartTimeUnix: jobMeta.StartTime,
|
||||
}
|
||||
|
||||
// TODO: Other metrics...
|
||||
job.FlopsAnyAvg = loadJobStat(&jobMeta, "flops_any")
|
||||
job.MemBwAvg = loadJobStat(&jobMeta, "mem_bw")
|
||||
job.NetBwAvg = loadJobStat(&jobMeta, "net_bw")
|
||||
job.FileBwAvg = loadJobStat(&jobMeta, "file_bw")
|
||||
job.RawResources, err = json.Marshal(job.Resources)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
job.RawMetaData, err = json.Marshal(job.MetaData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := SanityChecks(&job.BaseJob); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := archive.GetHandle().ImportJob(&jobMeta, &jobData); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
res, err := GetConnection().DB.NamedExec(NamedJobInsert, job)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
id, err := res.LastInsertId()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, tag := range job.Tags {
|
||||
if _, err := GetJobRepository().AddTagOrCreate(id, tag.Type, tag.Name); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
log.Infof("Successfully imported a new job (jobId: %d, cluster: %s, dbid: %d)", job.JobID, job.Cluster, id)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete the tables "job", "tag" and "jobtag" from the database and
|
||||
// repopulate them using the jobs found in `archive`.
|
||||
func InitDB() error {
|
||||
@@ -118,11 +224,10 @@ func InitDB() error {
|
||||
|
||||
ar := archive.GetHandle()
|
||||
i := 0
|
||||
errorOccured := false
|
||||
errorOccured := 0
|
||||
|
||||
for jobMeta := range ar.Iter() {
|
||||
|
||||
fmt.Printf("Import job %d\n", jobMeta.JobID)
|
||||
// // Bundle 100 inserts into one transaction for better performance:
|
||||
if i%10 == 0 {
|
||||
if tx != nil {
|
||||
@@ -155,35 +260,35 @@ func InitDB() error {
|
||||
|
||||
job.RawResources, err = json.Marshal(job.Resources)
|
||||
if err != nil {
|
||||
log.Errorf("fsBackend LoadClusterCfg()- %v", err)
|
||||
errorOccured = true
|
||||
log.Errorf("repository initDB()- %v", err)
|
||||
errorOccured++
|
||||
continue
|
||||
}
|
||||
|
||||
job.RawMetaData, err = json.Marshal(job.MetaData)
|
||||
if err != nil {
|
||||
log.Errorf("fsBackend LoadClusterCfg()- %v", err)
|
||||
errorOccured = true
|
||||
log.Errorf("repository initDB()- %v", err)
|
||||
errorOccured++
|
||||
continue
|
||||
}
|
||||
|
||||
if err := SanityChecks(&job.BaseJob); err != nil {
|
||||
log.Errorf("fsBackend LoadClusterCfg()- %v", err)
|
||||
errorOccured = true
|
||||
log.Errorf("repository initDB()- %v", err)
|
||||
errorOccured++
|
||||
continue
|
||||
}
|
||||
|
||||
res, err := stmt.Exec(job)
|
||||
if err != nil {
|
||||
log.Errorf("fsBackend LoadClusterCfg()- %v", err)
|
||||
errorOccured = true
|
||||
log.Errorf("repository initDB()- %v", err)
|
||||
errorOccured++
|
||||
continue
|
||||
}
|
||||
|
||||
id, err := res.LastInsertId()
|
||||
if err != nil {
|
||||
log.Errorf("fsBackend LoadClusterCfg()- %v", err)
|
||||
errorOccured = true
|
||||
log.Errorf("repository initDB()- %v", err)
|
||||
errorOccured++
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -212,8 +317,8 @@ func InitDB() error {
|
||||
}
|
||||
}
|
||||
|
||||
if errorOccured {
|
||||
log.Errorf("An error occured!")
|
||||
if errorOccured > 0 {
|
||||
log.Errorf("Error in import of %d jobs!", errorOccured)
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
|
Reference in New Issue
Block a user