Add import command line flag

This commit is contained in:
Lou Knauer 2022-02-24 11:54:36 +01:00
parent f304698823
commit 49fbfc23d4
5 changed files with 189 additions and 48 deletions

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/ClusterCockpit/cc-backend/log" "github.com/ClusterCockpit/cc-backend/log"
"github.com/ClusterCockpit/cc-backend/repository"
"github.com/ClusterCockpit/cc-backend/schema" "github.com/ClusterCockpit/cc-backend/schema"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
) )
@ -97,15 +98,7 @@ func initDB(db *sqlx.DB, archive string) error {
return err return err
} }
stmt, err := tx.PrepareNamed(`INSERT INTO job ( stmt, err := tx.PrepareNamed(repository.NamedJobInsert)
job_id, user, project, cluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, resources, meta_data,
mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total
) VALUES (
:job_id, :user, :project, :cluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :resources, :meta_data,
:mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total
);`)
if err != nil { if err != nil {
return err return err
} }
@ -192,6 +185,7 @@ func initDB(db *sqlx.DB, archive string) error {
return nil return nil
} }
// TODO: Remove double logic, use repository/import.go!
// Read the `meta.json` file at `path` and insert it to the database using the prepared // Read the `meta.json` file at `path` and insert it to the database using the prepared
// insert statement `stmt`. `tags` maps all existing tags to their database ID. // insert statement `stmt`. `tags` maps all existing tags to their database ID.
func loadJob(tx *sqlx.Tx, stmt *sqlx.NamedStmt, tags map[string]int64, path string) error { func loadJob(tx *sqlx.Tx, stmt *sqlx.NamedStmt, tags map[string]int64, path string) error {
@ -216,6 +210,8 @@ func loadJob(tx *sqlx.Tx, stmt *sqlx.NamedStmt, tags map[string]int64, path stri
// TODO: Other metrics... // TODO: Other metrics...
job.FlopsAnyAvg = loadJobStat(&jobMeta, "flops_any") job.FlopsAnyAvg = loadJobStat(&jobMeta, "flops_any")
job.MemBwAvg = loadJobStat(&jobMeta, "mem_bw") job.MemBwAvg = loadJobStat(&jobMeta, "mem_bw")
job.NetBwAvg = loadJobStat(&jobMeta, "net_bw")
job.FileBwAvg = loadJobStat(&jobMeta, "file_bw")
job.RawResources, err = json.Marshal(job.Resources) job.RawResources, err = json.Marshal(job.Resources)
if err != nil { if err != nil {

View File

@ -208,39 +208,50 @@ func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) {
return jobMeta, nil return jobMeta, nil
} }
dirPath, err := getPath(job, "", false) dir, err := getPath(job, "", false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := os.MkdirAll(dirPath, 0777); err != nil { return jobMeta, writeFiles(dir, jobMeta, &jobData)
return nil, err
} }
f, err := os.Create(path.Join(dirPath, "meta.json")) func writeFiles(dir string, jobMeta *schema.JobMeta, jobData *schema.JobData) error {
if err := os.MkdirAll(dir, 0777); err != nil {
return err
}
f, err := os.Create(path.Join(dir, "meta.json"))
if err != nil { if err != nil {
return nil, err return err
} }
defer f.Close() if err := json.NewEncoder(f).Encode(jobMeta); err != nil {
writer := bufio.NewWriter(f) return err
if err := json.NewEncoder(writer).Encode(jobMeta); err != nil {
return nil, err
} }
if err := writer.Flush(); err != nil { if err := f.Close(); err != nil {
return nil, err return err
} }
f, err = os.Create(path.Join(dirPath, "data.json")) f, err = os.Create(path.Join(dir, "data.json"))
if err != nil { if err != nil {
return nil, err return err
} }
writer = bufio.NewWriter(f) if err := json.NewEncoder(f).Encode(jobData); err != nil {
if err := json.NewEncoder(writer).Encode(jobData); err != nil { return err
return nil, err
} }
if err := writer.Flush(); err != nil { return f.Close()
return nil, err
} }
return jobMeta, f.Close() // Used to import a non-running job into the job-archive.
func ImportJob(job *schema.JobMeta, jobData *schema.JobData) error {
dir, err := getPath(&schema.Job{
BaseJob: job.BaseJob,
StartTimeUnix: job.StartTime,
StartTime: time.Unix(job.StartTime, 0),
}, "", false)
if err != nil {
return err
}
return writeFiles(dir, job, jobData)
} }

123
repository/import.go Normal file
View File

@ -0,0 +1,123 @@
package repository
import (
"bytes"
"database/sql"
"encoding/json"
"errors"
"fmt"
"os"
"strings"
"time"
"github.com/ClusterCockpit/cc-backend/log"
"github.com/ClusterCockpit/cc-backend/metricdata"
"github.com/ClusterCockpit/cc-backend/schema"
)
const NamedJobInsert string = `INSERT INTO job (
job_id, user, project, cluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, resources, meta_data,
mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total
) VALUES (
:job_id, :user, :project, :cluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :resources, :meta_data,
:mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total
);`
// Import all jobs specified as `<path-to-meta.json>:<path-to-data.json>,...`
func (r *JobRepository) HandleImportFlag(flag string) error {
for _, pair := range strings.Split(flag, ",") {
files := strings.Split(pair, ":")
if len(files) != 2 {
return errors.New("invalid import flag format")
}
raw, err := os.ReadFile(files[0])
if err != nil {
return err
}
dec := json.NewDecoder(bytes.NewReader(raw))
dec.DisallowUnknownFields()
jobMeta := schema.JobMeta{BaseJob: schema.JobDefaults}
if err := dec.Decode(&jobMeta); err != nil {
return err
}
raw, err = os.ReadFile(files[1])
if err != nil {
return err
}
dec = json.NewDecoder(bytes.NewReader(raw))
dec.DisallowUnknownFields()
jobData := schema.JobData{}
if err := dec.Decode(&jobData); err != nil {
return err
}
if err := r.ImportJob(&jobMeta, &jobData); err != nil {
return err
}
}
return nil
}
func (r *JobRepository) ImportJob(jobMeta *schema.JobMeta, jobData *schema.JobData) (err error) {
jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful
if err := metricdata.ImportJob(jobMeta, jobData); err != nil {
return err
}
if job, err := r.Find(&jobMeta.JobID, &jobMeta.Cluster, &jobMeta.StartTime); err != sql.ErrNoRows {
if err != nil {
return err
}
return fmt.Errorf("a job with that jobId, cluster and startTime does already exist (dbid: %d)", job.ID)
}
job := schema.Job{
BaseJob: jobMeta.BaseJob,
StartTime: time.Unix(jobMeta.StartTime, 0),
StartTimeUnix: jobMeta.StartTime,
}
// TODO: Other metrics...
job.FlopsAnyAvg = loadJobStat(jobMeta, "flops_any")
job.MemBwAvg = loadJobStat(jobMeta, "mem_bw")
job.NetBwAvg = loadJobStat(jobMeta, "net_bw")
job.FileBwAvg = loadJobStat(jobMeta, "file_bw")
job.RawResources, err = json.Marshal(job.Resources)
if err != nil {
return err
}
res, err := r.DB.NamedExec(NamedJobInsert, job)
if err != nil {
return err
}
id, err := res.LastInsertId()
if err != nil {
return err
}
for _, tag := range job.Tags {
if _, err := r.AddTagOrCreate(id, tag.Type, tag.Name); err != nil {
return err
}
}
log.Infof("Successfully imported a new job (jobId: %d, cluster: %s, dbid: %d)", job.JobID, job.Cluster, id)
return nil
}
func loadJobStat(job *schema.JobMeta, metric string) float64 {
if stats, ok := job.Statistics[metric]; ok {
return stats.Avg
}
return 0.0
}

View File

@ -122,6 +122,7 @@ func (r *JobRepository) Stop(
return return
} }
// TODO: Use node hours instead: SELECT job.user, sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN CAST(strftime('%s', 'now') AS INTEGER) - job.start_time ELSE job.duration END)) as x FROM job GROUP BY user ORDER BY x DESC;
func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggregate, filters []*model.JobFilter, limit *int) (map[string]int, error) { func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggregate, filters []*model.JobFilter, limit *int) (map[string]int, error) {
if !aggreg.IsValid() { if !aggreg.IsValid() {
return nil, errors.New("invalid aggregate") return nil, errors.New("invalid aggregate")

View File

@ -238,7 +238,7 @@ var routes []Route = []Route{
func main() { func main() {
var flagReinitDB, flagStopImmediately, flagSyncLDAP bool var flagReinitDB, flagStopImmediately, flagSyncLDAP bool
var flagConfigFile string var flagConfigFile, flagImportJob string
var flagNewUser, flagDelUser, flagGenJWT string var flagNewUser, flagDelUser, flagGenJWT string
flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize `job`, `tag`, and `jobtag` tables") flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize `job`, `tag`, and `jobtag` tables")
flag.BoolVar(&flagSyncLDAP, "sync-ldap", false, "Sync the `user` table with ldap") flag.BoolVar(&flagSyncLDAP, "sync-ldap", false, "Sync the `user` table with ldap")
@ -247,6 +247,7 @@ func main() {
flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: `<username>:[admin,api,user]:<password>`") flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: `<username>:[admin,api,user]:<password>`")
flag.StringVar(&flagDelUser, "del-user", "", "Remove user by username") flag.StringVar(&flagDelUser, "del-user", "", "Remove user by username")
flag.StringVar(&flagGenJWT, "jwt", "", "Generate and print a JWT for the user specified by the username") flag.StringVar(&flagGenJWT, "jwt", "", "Generate and print a JWT for the user specified by the username")
flag.StringVar(&flagImportJob, "import-job", "", "Import a job. Argument format: `<path-to-meta.json>:<path-to-data.json>,...`")
flag.Parse() flag.Parse()
if err := loadEnv("./.env"); err != nil && !os.IsNotExist(err) { if err := loadEnv("./.env"); err != nil && !os.IsNotExist(err) {
@ -254,11 +255,14 @@ func main() {
} }
if flagConfigFile != "" { if flagConfigFile != "" {
data, err := os.ReadFile(flagConfigFile) f, err := os.Open(flagConfigFile)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
if err := json.Unmarshal(data, &programConfig); err != nil { defer f.Close()
dec := json.NewDecoder(f)
dec.DisallowUnknownFields()
if err := dec.Decode(&programConfig); err != nil {
log.Fatal(err) log.Fatal(err)
} }
} }
@ -356,17 +360,23 @@ func main() {
} }
} }
jobRepo = &repository.JobRepository{DB: db}
if err := jobRepo.Init(); err != nil {
log.Fatal(err)
}
if flagImportJob != "" {
if err := jobRepo.HandleImportFlag(flagImportJob); err != nil {
log.Fatalf("import failed: %s", err.Error())
}
}
if flagStopImmediately { if flagStopImmediately {
return return
} }
// Build routes... // Build routes...
jobRepo = &repository.JobRepository{DB: db}
if err := jobRepo.Init(); err != nil {
log.Fatal(err)
}
resolver := &graph.Resolver{DB: db, Repo: jobRepo} resolver := &graph.Resolver{DB: db, Repo: jobRepo}
graphQLEndpoint := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: resolver})) graphQLEndpoint := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: resolver}))
if os.Getenv("DEBUG") != "1" { if os.Getenv("DEBUG") != "1" {