From 520c814e3bd5f518cccf83dbb581a50ded867533 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 13 Sep 2022 15:20:07 +0200 Subject: [PATCH] Introduce import job flag --- cmd/cc-backend/main.go | 10 ++- internal/repository/init.go | 133 ++++++++++++++++++++++++++++++++---- pkg/archive/archive.go | 5 -- 3 files changed, 128 insertions(+), 20 deletions(-) diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 6d0a987..eb44313 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -43,11 +43,12 @@ import ( _ "github.com/go-sql-driver/mysql" _ "github.com/mattn/go-sqlite3" + _ "github.com/santhosh-tekuri/jsonschema/v5/httploader" ) func main() { var flagReinitDB, flagServer, flagSyncLDAP, flagGops, flagDev bool - var flagNewUser, flagDelUser, flagGenJWT, flagConfigFile string + var flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob string flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize the 'job', 'tag', and 'jobtag' tables (all running jobs will be lost!)") flag.BoolVar(&flagSyncLDAP, "sync-ldap", false, "Sync the 'user' table with ldap") flag.BoolVar(&flagServer, "server", false, "Do not start a server, stop right after initialization and argument handling") @@ -57,6 +58,7 @@ func main() { flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: `:[admin,support,api,user]:`") flag.StringVar(&flagDelUser, "del-user", "", "Remove user by `username`") flag.StringVar(&flagGenJWT, "jwt", "", "Generate and print a JWT for the user specified by its `username`") + flag.StringVar(&flagImportJob, "import-job", "", "Import a job. Argument format: `:,...`") flag.Parse() // See https://github.com/google/gops (Runtime overhead is almost zero) @@ -163,6 +165,12 @@ func main() { } } + if flagImportJob != "" { + if err := repository.HandleImportFlag(flagImportJob); err != nil { + log.Fatalf("import failed: %s", err.Error()) + } + } + if !flagServer { return } diff --git a/internal/repository/init.go b/internal/repository/init.go index 0f69ea5..fcc2c31 100644 --- a/internal/repository/init.go +++ b/internal/repository/init.go @@ -5,10 +5,15 @@ package repository import ( + "bytes" + "database/sql" "encoding/json" "fmt" + "os" + "strings" "time" + "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -85,6 +90,107 @@ const NamedJobInsert string = `INSERT INTO job ( :mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total );` +// Import all jobs specified as `:,...` +func HandleImportFlag(flag string) error { + for _, pair := range strings.Split(flag, ",") { + files := strings.Split(pair, ":") + if len(files) != 2 { + return fmt.Errorf("invalid import flag format") + } + + raw, err := os.ReadFile(files[0]) + if err != nil { + return err + } + + if config.Keys.Validate { + if err := schema.Validate(schema.Meta, bytes.NewReader(raw)); err != nil { + return err + } + } + dec := json.NewDecoder(bytes.NewReader(raw)) + dec.DisallowUnknownFields() + jobMeta := schema.JobMeta{BaseJob: schema.JobDefaults} + if err := dec.Decode(&jobMeta); err != nil { + return err + } + + raw, err = os.ReadFile(files[1]) + if err != nil { + return err + } + + if config.Keys.Validate { + if err := schema.Validate(schema.Data, bytes.NewReader(raw)); err != nil { + return err + } + } + dec = json.NewDecoder(bytes.NewReader(raw)) + dec.DisallowUnknownFields() + jobData := schema.JobData{} + if err := dec.Decode(&jobData); err != nil { + return err + } + + SanityChecks(&jobMeta.BaseJob) + jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful + if job, err := GetJobRepository().Find(&jobMeta.JobID, &jobMeta.Cluster, &jobMeta.StartTime); err != sql.ErrNoRows { + if err != nil { + return err + } + + return fmt.Errorf("a job with that jobId, cluster and startTime does already exist (dbid: %d)", job.ID) + } + + job := schema.Job{ + BaseJob: jobMeta.BaseJob, + StartTime: time.Unix(jobMeta.StartTime, 0), + StartTimeUnix: jobMeta.StartTime, + } + + // TODO: Other metrics... + job.FlopsAnyAvg = loadJobStat(&jobMeta, "flops_any") + job.MemBwAvg = loadJobStat(&jobMeta, "mem_bw") + job.NetBwAvg = loadJobStat(&jobMeta, "net_bw") + job.FileBwAvg = loadJobStat(&jobMeta, "file_bw") + job.RawResources, err = json.Marshal(job.Resources) + if err != nil { + return err + } + job.RawMetaData, err = json.Marshal(job.MetaData) + if err != nil { + return err + } + + if err := SanityChecks(&job.BaseJob); err != nil { + return err + } + + if err := archive.GetHandle().ImportJob(&jobMeta, &jobData); err != nil { + return err + } + + res, err := GetConnection().DB.NamedExec(NamedJobInsert, job) + if err != nil { + return err + } + + id, err := res.LastInsertId() + if err != nil { + return err + } + + for _, tag := range job.Tags { + if _, err := GetJobRepository().AddTagOrCreate(id, tag.Type, tag.Name); err != nil { + return err + } + } + + log.Infof("Successfully imported a new job (jobId: %d, cluster: %s, dbid: %d)", job.JobID, job.Cluster, id) + } + return nil +} + // Delete the tables "job", "tag" and "jobtag" from the database and // repopulate them using the jobs found in `archive`. func InitDB() error { @@ -118,11 +224,10 @@ func InitDB() error { ar := archive.GetHandle() i := 0 - errorOccured := false + errorOccured := 0 for jobMeta := range ar.Iter() { - fmt.Printf("Import job %d\n", jobMeta.JobID) // // Bundle 100 inserts into one transaction for better performance: if i%10 == 0 { if tx != nil { @@ -155,35 +260,35 @@ func InitDB() error { job.RawResources, err = json.Marshal(job.Resources) if err != nil { - log.Errorf("fsBackend LoadClusterCfg()- %v", err) - errorOccured = true + log.Errorf("repository initDB()- %v", err) + errorOccured++ continue } job.RawMetaData, err = json.Marshal(job.MetaData) if err != nil { - log.Errorf("fsBackend LoadClusterCfg()- %v", err) - errorOccured = true + log.Errorf("repository initDB()- %v", err) + errorOccured++ continue } if err := SanityChecks(&job.BaseJob); err != nil { - log.Errorf("fsBackend LoadClusterCfg()- %v", err) - errorOccured = true + log.Errorf("repository initDB()- %v", err) + errorOccured++ continue } res, err := stmt.Exec(job) if err != nil { - log.Errorf("fsBackend LoadClusterCfg()- %v", err) - errorOccured = true + log.Errorf("repository initDB()- %v", err) + errorOccured++ continue } id, err := res.LastInsertId() if err != nil { - log.Errorf("fsBackend LoadClusterCfg()- %v", err) - errorOccured = true + log.Errorf("repository initDB()- %v", err) + errorOccured++ continue } @@ -212,8 +317,8 @@ func InitDB() error { } } - if errorOccured { - log.Errorf("An error occured!") + if errorOccured > 0 { + log.Errorf("Error in import of %d jobs!", errorOccured) } if err := tx.Commit(); err != nil { diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 463a719..d4d1582 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -92,11 +92,6 @@ func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) { return metaFile.Statistics, nil } -func Import(job *schema.JobMeta, jobData *schema.JobData) error { - - return ar.ImportJob(job, jobData) -} - // If the job is archived, find its `meta.json` file and override the tags list // in that JSON file. If the job is not archived, nothing is done. func UpdateTags(job *schema.Job, tags []*schema.Tag) error {