diff --git a/init-db.go b/init-db.go index e913307..ce8f9e7 100644 --- a/init-db.go +++ b/init-db.go @@ -4,14 +4,20 @@ import ( "bufio" "database/sql" "encoding/json" - "github.com/jmoiron/sqlx" - "log" + "fmt" "os" "path/filepath" "strings" + "time" + + "github.com/jmoiron/sqlx" ) func initDB(db *sqlx.DB, archive string) error { + starttime := time.Now() + fmt.Println("Building database...") + + // Basic database structure: _, err := db.Exec(` DROP TABLE IF EXISTS job; DROP TABLE IF EXISTS tag; @@ -44,10 +50,7 @@ func initDB(db *sqlx.DB, archive string) error { tag_id INTEGER, PRIMARY KEY (job_id, tag_id), FOREIGN KEY (job_id) REFERENCES job (id) ON DELETE CASCADE ON UPDATE NO ACTION, - FOREIGN KEY (tag_id) REFERENCES tag (id) ON DELETE CASCADE ON UPDATE NO ACTION); - - CREATE INDEX job_by_user ON job (user_id); - CREATE INDEX job_by_starttime ON job (start_time);`) + FOREIGN KEY (tag_id) REFERENCES tag (id) ON DELETE CASCADE ON UPDATE NO ACTION);`) if err != nil { return err } @@ -57,6 +60,15 @@ func initDB(db *sqlx.DB, archive string) error { return err } + insertstmt, err := db.Prepare(`INSERT INTO job + (job_id, user_id, project_id, cluster_id, start_time, duration, job_state, num_nodes, node_list, metadata, flops_any_avg, mem_bw_avg, net_bw_avg, file_bw_avg, load_avg) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`) + if err != nil { + return err + } + + var tx *sql.Tx = nil + var i int = 0 tags := make(map[string]int64) for _, entry0 := range entries0 { entries1, err := os.ReadDir(filepath.Join(archive, entry0.Name())) @@ -75,27 +87,59 @@ func initDB(db *sqlx.DB, archive string) error { } for _, entry2 := range entries2 { - if err = loadJob(db, tags, filepath.Join(archive, entry0.Name(), entry1.Name(), entry2.Name())); err != nil { + // Bundle 200 inserts into one transaction for better performance: + if i%200 == 0 { + if tx != nil { + if err := tx.Commit(); err != nil { + return err + } + } + + tx, err = db.Begin() + if err != nil { + return err + } + + insertstmt = tx.Stmt(insertstmt) + fmt.Printf("%d jobs inserted...\r", i) + } + + if err = loadJob(tx, insertstmt, tags, filepath.Join(archive, entry0.Name(), entry1.Name(), entry2.Name())); err != nil { return err } + + i += 1 } } } + if err := tx.Commit(); err != nil { + return err + } + + // Create indexes after inserts so that they do not + // need to be continually updated. + if _, err := db.Exec(` + CREATE INDEX job_by_user ON job (user_id); + CREATE INDEX job_by_starttime ON job (start_time);`); err != nil { + return err + } + + fmt.Printf("A total of %d jobs have been registered in %.3f seconds.\n", i, time.Since(starttime).Seconds()) return nil } type JobMetaFile struct { - JobId string `json:"job_id"` - UserId string `json:"user_id"` - ProjectId string `json:"project_id"` - ClusterId string `json:"cluster_id"` - NumNodes int `json:"num_nodes"` - JobState string `json:"job_state"` - StartTime int64 `json:"start_time"` - Duration int64 `json:"duration"` - Nodes []string `json:"nodes"` - Tags []struct { + JobId string `json:"job_id"` + UserId string `json:"user_id"` + ProjectId string `json:"project_id"` + ClusterId string `json:"cluster_id"` + NumNodes int `json:"num_nodes"` + JobState string `json:"job_state"` + StartTime int64 `json:"start_time"` + Duration int64 `json:"duration"` + Nodes []string `json:"nodes"` + Tags []struct { Name string `json:"name"` Type string `json:"type"` } `json:"tags"` @@ -107,7 +151,7 @@ type JobMetaFile struct { } `json:"statistics"` } -func loadJob(db *sqlx.DB, tags map[string]int64, path string) error { +func loadJob(tx *sql.Tx, stmt *sql.Stmt, tags map[string]int64, path string) error { f, err := os.Open(filepath.Join(path, "meta.json")) if err != nil { return err @@ -125,14 +169,8 @@ func loadJob(db *sqlx.DB, tags map[string]int64, path string) error { fileBwAvg := loadJobStat(&job, "file_bw") loadAvg := loadJobStat(&job, "load_one") - res, err := db.Exec(` - INSERT INTO job - (job_id, user_id, project_id, cluster_id, start_time, duration, job_state, num_nodes, node_list, metadata, - flops_any_avg, mem_bw_avg, net_bw_avg, file_bw_avg, load_avg) - VALUES - (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`, - job.JobId, job.UserId, job.ProjectId, job.ClusterId, job.StartTime, job.Duration, job.JobState, job.NumNodes, strings.Join(job.Nodes, ","), nil, - flopsAnyAvg, memBwAvg, netBwAvg, fileBwAvg, loadAvg) + res, err := stmt.Exec(job.JobId, job.UserId, job.ProjectId, job.ClusterId, job.StartTime, job.Duration, job.JobState, + job.NumNodes, strings.Join(job.Nodes, ","), nil, flopsAnyAvg, memBwAvg, netBwAvg, fileBwAvg, loadAvg) if err != nil { return err } @@ -142,15 +180,11 @@ func loadJob(db *sqlx.DB, tags map[string]int64, path string) error { return err } - if id % 50 == 0 { - log.Printf("Inserting Job (id: %d, jobId: %s, clusterId: %s)\n", id, job.JobId, job.ClusterId) - } - for _, tag := range job.Tags { tagstr := tag.Name + ":" + tag.Type tagId, ok := tags[tagstr] if !ok { - res, err := db.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type) + res, err := tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type) if err != nil { return err } @@ -161,7 +195,7 @@ func loadJob(db *sqlx.DB, tags map[string]int64, path string) error { tags[tagstr] = tagId } - if _, err := db.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, id, tagId); err != nil { + if _, err := tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, id, tagId); err != nil { return err } }