From 49fbfc23d4abfdfb56591746ac30b6f78d3afbf1 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Thu, 24 Feb 2022 11:54:36 +0100 Subject: [PATCH] Add import command line flag --- init-db.go | 14 ++--- metricdata/archive.go | 73 ++++++++++++++----------- repository/import.go | 123 ++++++++++++++++++++++++++++++++++++++++++ repository/job.go | 1 + server.go | 26 ++++++--- 5 files changed, 189 insertions(+), 48 deletions(-) create mode 100644 repository/import.go diff --git a/init-db.go b/init-db.go index f53ae05..3c2595f 100644 --- a/init-db.go +++ b/init-db.go @@ -9,6 +9,7 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/log" + "github.com/ClusterCockpit/cc-backend/repository" "github.com/ClusterCockpit/cc-backend/schema" "github.com/jmoiron/sqlx" ) @@ -97,15 +98,7 @@ func initDB(db *sqlx.DB, archive string) error { return err } - stmt, err := tx.PrepareNamed(`INSERT INTO job ( - job_id, user, project, cluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc, - exclusive, monitoring_status, smt, job_state, start_time, duration, resources, meta_data, - mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total - ) VALUES ( - :job_id, :user, :project, :cluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, - :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :resources, :meta_data, - :mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total - );`) + stmt, err := tx.PrepareNamed(repository.NamedJobInsert) if err != nil { return err } @@ -192,6 +185,7 @@ func initDB(db *sqlx.DB, archive string) error { return nil } +// TODO: Remove double logic, use repository/import.go! // Read the `meta.json` file at `path` and insert it to the database using the prepared // insert statement `stmt`. `tags` maps all existing tags to their database ID. func loadJob(tx *sqlx.Tx, stmt *sqlx.NamedStmt, tags map[string]int64, path string) error { @@ -216,6 +210,8 @@ func loadJob(tx *sqlx.Tx, stmt *sqlx.NamedStmt, tags map[string]int64, path stri // TODO: Other metrics... job.FlopsAnyAvg = loadJobStat(&jobMeta, "flops_any") job.MemBwAvg = loadJobStat(&jobMeta, "mem_bw") + job.NetBwAvg = loadJobStat(&jobMeta, "net_bw") + job.FileBwAvg = loadJobStat(&jobMeta, "file_bw") job.RawResources, err = json.Marshal(job.Resources) if err != nil { diff --git a/metricdata/archive.go b/metricdata/archive.go index 9959b8f..e2aff03 100644 --- a/metricdata/archive.go +++ b/metricdata/archive.go @@ -208,39 +208,50 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { return jobMeta, nil } - dirPath, err := getPath(job, "", false) + dir, err := getPath(job, "", false) if err != nil { return nil, err } - if err := os.MkdirAll(dirPath, 0777); err != nil { - return nil, err - } - - f, err := os.Create(path.Join(dirPath, "meta.json")) - if err != nil { - return nil, err - } - defer f.Close() - writer := bufio.NewWriter(f) - if err := json.NewEncoder(writer).Encode(jobMeta); err != nil { - return nil, err - } - if err := writer.Flush(); err != nil { - return nil, err - } - - f, err = os.Create(path.Join(dirPath, "data.json")) - if err != nil { - return nil, err - } - writer = bufio.NewWriter(f) - if err := json.NewEncoder(writer).Encode(jobData); err != nil { - return nil, err - } - if err := writer.Flush(); err != nil { - return nil, err - } - - return jobMeta, f.Close() + return jobMeta, writeFiles(dir, jobMeta, &jobData) +} + +func writeFiles(dir string, jobMeta *schema.JobMeta, jobData *schema.JobData) error { + if err := os.MkdirAll(dir, 0777); err != nil { + return err + } + + f, err := os.Create(path.Join(dir, "meta.json")) + if err != nil { + return err + } + if err := json.NewEncoder(f).Encode(jobMeta); err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + + f, err = os.Create(path.Join(dir, "data.json")) + if err != nil { + return err + } + if err := json.NewEncoder(f).Encode(jobData); err != nil { + return err + } + return f.Close() +} + +// Used to import a non-running job into the job-archive. +func ImportJob(job *schema.JobMeta, jobData *schema.JobData) error { + dir, err := getPath(&schema.Job{ + BaseJob: job.BaseJob, + StartTimeUnix: job.StartTime, + StartTime: time.Unix(job.StartTime, 0), + }, "", false) + if err != nil { + return err + } + + return writeFiles(dir, job, jobData) } diff --git a/repository/import.go b/repository/import.go new file mode 100644 index 0000000..ac0b293 --- /dev/null +++ b/repository/import.go @@ -0,0 +1,123 @@ +package repository + +import ( + "bytes" + "database/sql" + "encoding/json" + "errors" + "fmt" + "os" + "strings" + "time" + + "github.com/ClusterCockpit/cc-backend/log" + "github.com/ClusterCockpit/cc-backend/metricdata" + "github.com/ClusterCockpit/cc-backend/schema" +) + +const NamedJobInsert string = `INSERT INTO job ( + job_id, user, project, cluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc, + exclusive, monitoring_status, smt, job_state, start_time, duration, resources, meta_data, + mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total +) VALUES ( + :job_id, :user, :project, :cluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, + :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :resources, :meta_data, + :mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total +);` + +// Import all jobs specified as `:,...` +func (r *JobRepository) HandleImportFlag(flag string) error { + for _, pair := range strings.Split(flag, ",") { + files := strings.Split(pair, ":") + if len(files) != 2 { + return errors.New("invalid import flag format") + } + + raw, err := os.ReadFile(files[0]) + if err != nil { + return err + } + + dec := json.NewDecoder(bytes.NewReader(raw)) + dec.DisallowUnknownFields() + jobMeta := schema.JobMeta{BaseJob: schema.JobDefaults} + if err := dec.Decode(&jobMeta); err != nil { + return err + } + + raw, err = os.ReadFile(files[1]) + if err != nil { + return err + } + + dec = json.NewDecoder(bytes.NewReader(raw)) + dec.DisallowUnknownFields() + jobData := schema.JobData{} + if err := dec.Decode(&jobData); err != nil { + return err + } + + if err := r.ImportJob(&jobMeta, &jobData); err != nil { + return err + } + } + return nil +} + +func (r *JobRepository) ImportJob(jobMeta *schema.JobMeta, jobData *schema.JobData) (err error) { + jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful + if err := metricdata.ImportJob(jobMeta, jobData); err != nil { + return err + } + + if job, err := r.Find(&jobMeta.JobID, &jobMeta.Cluster, &jobMeta.StartTime); err != sql.ErrNoRows { + if err != nil { + return err + } + + return fmt.Errorf("a job with that jobId, cluster and startTime does already exist (dbid: %d)", job.ID) + } + + job := schema.Job{ + BaseJob: jobMeta.BaseJob, + StartTime: time.Unix(jobMeta.StartTime, 0), + StartTimeUnix: jobMeta.StartTime, + } + + // TODO: Other metrics... + job.FlopsAnyAvg = loadJobStat(jobMeta, "flops_any") + job.MemBwAvg = loadJobStat(jobMeta, "mem_bw") + job.NetBwAvg = loadJobStat(jobMeta, "net_bw") + job.FileBwAvg = loadJobStat(jobMeta, "file_bw") + job.RawResources, err = json.Marshal(job.Resources) + if err != nil { + return err + } + + res, err := r.DB.NamedExec(NamedJobInsert, job) + if err != nil { + return err + } + + id, err := res.LastInsertId() + if err != nil { + return err + } + + for _, tag := range job.Tags { + if _, err := r.AddTagOrCreate(id, tag.Type, tag.Name); err != nil { + return err + } + } + + log.Infof("Successfully imported a new job (jobId: %d, cluster: %s, dbid: %d)", job.JobID, job.Cluster, id) + return nil +} + +func loadJobStat(job *schema.JobMeta, metric string) float64 { + if stats, ok := job.Statistics[metric]; ok { + return stats.Avg + } + + return 0.0 +} diff --git a/repository/job.go b/repository/job.go index 98710a2..910dabc 100644 --- a/repository/job.go +++ b/repository/job.go @@ -122,6 +122,7 @@ func (r *JobRepository) Stop( return } +// TODO: Use node hours instead: SELECT job.user, sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN CAST(strftime('%s', 'now') AS INTEGER) - job.start_time ELSE job.duration END)) as x FROM job GROUP BY user ORDER BY x DESC; func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggregate, filters []*model.JobFilter, limit *int) (map[string]int, error) { if !aggreg.IsValid() { return nil, errors.New("invalid aggregate") diff --git a/server.go b/server.go index 8b50130..eb7acc1 100644 --- a/server.go +++ b/server.go @@ -238,7 +238,7 @@ var routes []Route = []Route{ func main() { var flagReinitDB, flagStopImmediately, flagSyncLDAP bool - var flagConfigFile string + var flagConfigFile, flagImportJob string var flagNewUser, flagDelUser, flagGenJWT string flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize `job`, `tag`, and `jobtag` tables") flag.BoolVar(&flagSyncLDAP, "sync-ldap", false, "Sync the `user` table with ldap") @@ -247,6 +247,7 @@ func main() { flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: `:[admin,api,user]:`") flag.StringVar(&flagDelUser, "del-user", "", "Remove user by username") flag.StringVar(&flagGenJWT, "jwt", "", "Generate and print a JWT for the user specified by the username") + flag.StringVar(&flagImportJob, "import-job", "", "Import a job. Argument format: `:,...`") flag.Parse() if err := loadEnv("./.env"); err != nil && !os.IsNotExist(err) { @@ -254,11 +255,14 @@ func main() { } if flagConfigFile != "" { - data, err := os.ReadFile(flagConfigFile) + f, err := os.Open(flagConfigFile) if err != nil { log.Fatal(err) } - if err := json.Unmarshal(data, &programConfig); err != nil { + defer f.Close() + dec := json.NewDecoder(f) + dec.DisallowUnknownFields() + if err := dec.Decode(&programConfig); err != nil { log.Fatal(err) } } @@ -356,17 +360,23 @@ func main() { } } + jobRepo = &repository.JobRepository{DB: db} + if err := jobRepo.Init(); err != nil { + log.Fatal(err) + } + + if flagImportJob != "" { + if err := jobRepo.HandleImportFlag(flagImportJob); err != nil { + log.Fatalf("import failed: %s", err.Error()) + } + } + if flagStopImmediately { return } // Build routes... - jobRepo = &repository.JobRepository{DB: db} - if err := jobRepo.Init(); err != nil { - log.Fatal(err) - } - resolver := &graph.Resolver{DB: db, Repo: jobRepo} graphQLEndpoint := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: resolver})) if os.Getenv("DEBUG") != "1" {