From c64a9357502b994edb0c255de0f59e8c2cdb3c33 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Sun, 11 Sep 2022 07:13:08 +0200 Subject: [PATCH] Cleanup and improve error handling --- internal/repository/init.go | 120 +++++++++--------------------------- 1 file changed, 29 insertions(+), 91 deletions(-) diff --git a/internal/repository/init.go b/internal/repository/init.go index 531b5e3..6d0c6a9 100644 --- a/internal/repository/init.go +++ b/internal/repository/init.go @@ -5,17 +5,13 @@ package repository import ( - "bufio" "encoding/json" "fmt" - "os" - "path/filepath" "time" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" - "github.com/jmoiron/sqlx" ) // `AUTO_INCREMENT` is in a comment because of this hack: @@ -113,19 +109,22 @@ func InitDB() error { if err != nil { return err } + tags := make(map[string]int64) // Not using log.Print because we want the line to end with `\r` and // this function is only ever called when a special command line flag // is passed anyways. fmt.Printf("%d jobs inserted...\r", 0) - tags := make(map[string]int64) ar := archive.GetHandle() i := 0 + errorOccured := false for jobMeta := range ar.Iter() { - // Bundle 100 inserts into one transaction for better performance: - if i%100 == 0 { + + fmt.Printf("Import job %d\n", jobMeta.JobID) + // // Bundle 100 inserts into one transaction for better performance: + if i%10 == 0 { if tx != nil { if err := tx.Commit(); err != nil { return err @@ -156,26 +155,36 @@ func InitDB() error { job.RawResources, err = json.Marshal(job.Resources) if err != nil { - return err + log.Errorf("fsBackend LoadClusterCfg()- %v", err) + errorOccured = true + continue } job.RawMetaData, err = json.Marshal(job.MetaData) if err != nil { - return err + log.Errorf("fsBackend LoadClusterCfg()- %v", err) + errorOccured = true + continue } if err := SanityChecks(&job.BaseJob); err != nil { - return err + log.Errorf("fsBackend LoadClusterCfg()- %v", err) + errorOccured = true + continue } - res, err := db.DB.NamedExec(NamedJobInsert, job) + res, err := stmt.Exec(job) if err != nil { - return err + log.Errorf("fsBackend LoadClusterCfg()- %v", err) + errorOccured = true + continue } id, err := res.LastInsertId() if err != nil { - return err + log.Errorf("fsBackend LoadClusterCfg()- %v", err) + errorOccured = true + continue } for _, tag := range job.Tags { @@ -203,6 +212,10 @@ func InitDB() error { } } + if errorOccured { + log.Errorf("An error occured!") + } + if err := tx.Commit(); err != nil { return err } @@ -217,84 +230,6 @@ func InitDB() error { return nil } -// TODO: Remove double logic, use repository/import.go! -// Read the `meta.json` file at `path` and insert it to the database using the prepared -// insert statement `stmt`. `tags` maps all existing tags to their database ID. -func loadJob(tx *sqlx.Tx, - stmt *sqlx.NamedStmt, - tags map[string]int64, - path string) error { - f, err := os.Open(filepath.Join(path, "meta.json")) - if err != nil { - return err - } - defer f.Close() - - jobMeta := schema.JobMeta{BaseJob: schema.JobDefaults} - if err := json.NewDecoder(bufio.NewReader(f)).Decode(&jobMeta); err != nil { - return err - } - - jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful - job := schema.Job{ - BaseJob: jobMeta.BaseJob, - StartTime: time.Unix(jobMeta.StartTime, 0), - StartTimeUnix: jobMeta.StartTime, - } - - // TODO: Other metrics... - job.FlopsAnyAvg = loadJobStat(&jobMeta, "flops_any") - job.MemBwAvg = loadJobStat(&jobMeta, "mem_bw") - job.NetBwAvg = loadJobStat(&jobMeta, "net_bw") - job.FileBwAvg = loadJobStat(&jobMeta, "file_bw") - - job.RawResources, err = json.Marshal(job.Resources) - if err != nil { - return err - } - - job.RawMetaData, err = json.Marshal(job.MetaData) - if err != nil { - return err - } - - if err := SanityChecks(&job.BaseJob); err != nil { - return err - } - - res, err := stmt.Exec(job) - if err != nil { - return err - } - - id, err := res.LastInsertId() - if err != nil { - return err - } - - for _, tag := range job.Tags { - tagstr := tag.Name + ":" + tag.Type - tagId, ok := tags[tagstr] - if !ok { - res, err := tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type) - if err != nil { - return err - } - tagId, err = res.LastInsertId() - if err != nil { - return err - } - tags[tagstr] = tagId - } - - if _, err := tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, id, tagId); err != nil { - return err - } - } - - return nil -} - // This function also sets the subcluster if necessary! func SanityChecks(job *schema.BaseJob) error { if c := archive.GetCluster(job.Cluster); c == nil { @@ -315,6 +250,9 @@ func SanityChecks(job *schema.BaseJob) error { if len(job.Resources) != int(job.NumNodes) { return fmt.Errorf("len(resources) does not equal numNodes (%d vs %d)", len(job.Resources), job.NumNodes) } + if job.ArrayJobId == 0 { + job.ArrayJobId = job.JobID + } return nil }