diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index a7327ba..0fe309e 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -29,6 +29,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph" "github.com/ClusterCockpit/cc-backend/internal/graph/generated" + "github.com/ClusterCockpit/cc-backend/internal/importer" "github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/routerConfig" @@ -199,13 +200,13 @@ func main() { } if flagReinitDB { - if err := repository.InitDB(); err != nil { + if err := importer.InitDB(); err != nil { log.Fatalf("failed to re-initialize repository DB: %s", err.Error()) } } if flagImportJob != "" { - if err := repository.HandleImportFlag(flagImportJob); err != nil { + if err := importer.HandleImportFlag(flagImportJob); err != nil { log.Fatalf("job import failed: %s", err.Error()) } } diff --git a/internal/api/rest.go b/internal/api/rest.go index 5bad0ad..484f7a1 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -22,6 +22,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/graph" "github.com/ClusterCockpit/cc-backend/internal/graph/model" + "github.com/ClusterCockpit/cc-backend/internal/importer" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" @@ -396,7 +397,7 @@ func (api *RestApi) startJob(rw http.ResponseWriter, r *http.Request) { if req.State == "" { req.State = schema.JobStateRunning } - if err := repository.SanityChecks(&req.BaseJob); err != nil { + if err := importer.SanityChecks(&req.BaseJob); err != nil { handleError(err, http.StatusBadRequest, rw) return } diff --git a/internal/importer/handleImport.go b/internal/importer/handleImport.go new file mode 100644 index 0000000..48b224f --- /dev/null +++ b/internal/importer/handleImport.go @@ -0,0 +1,131 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package importer + +import ( + "bytes" + "database/sql" + "encoding/json" + "fmt" + "os" + "strings" + "time" + + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" +) + +// Import all jobs specified as `:,...` +func HandleImportFlag(flag string) error { + r := repository.GetJobRepository() + + for _, pair := range strings.Split(flag, ",") { + files := strings.Split(pair, ":") + if len(files) != 2 { + return fmt.Errorf("REPOSITORY/INIT > invalid import flag format") + } + + raw, err := os.ReadFile(files[0]) + if err != nil { + log.Warn("Error while reading metadata file for import") + return err + } + + if config.Keys.Validate { + if err := schema.Validate(schema.Meta, bytes.NewReader(raw)); err != nil { + return fmt.Errorf("REPOSITORY/INIT > validate job meta: %v", err) + } + } + dec := json.NewDecoder(bytes.NewReader(raw)) + dec.DisallowUnknownFields() + jobMeta := schema.JobMeta{BaseJob: schema.JobDefaults} + if err := dec.Decode(&jobMeta); err != nil { + log.Warn("Error while decoding raw json metadata for import") + return err + } + + raw, err = os.ReadFile(files[1]) + if err != nil { + log.Warn("Error while reading jobdata file for import") + return err + } + + if config.Keys.Validate { + if err := schema.Validate(schema.Data, bytes.NewReader(raw)); err != nil { + return fmt.Errorf("REPOSITORY/INIT > validate job data: %v", err) + } + } + dec = json.NewDecoder(bytes.NewReader(raw)) + dec.DisallowUnknownFields() + jobData := schema.JobData{} + if err := dec.Decode(&jobData); err != nil { + log.Warn("Error while decoding raw json jobdata for import") + return err + } + + //checkJobData(&jobData) + // SanityChecks(&jobMeta.BaseJob) + jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful + if job, err := r.Find(&jobMeta.JobID, &jobMeta.Cluster, &jobMeta.StartTime); err != sql.ErrNoRows { + if err != nil { + log.Warn("Error while finding job in jobRepository") + return err + } + + return fmt.Errorf("REPOSITORY/INIT > a job with that jobId, cluster and startTime does already exist (dbid: %d)", job.ID) + } + + job := schema.Job{ + BaseJob: jobMeta.BaseJob, + StartTime: time.Unix(jobMeta.StartTime, 0), + StartTimeUnix: jobMeta.StartTime, + } + + // TODO: Other metrics... + job.FlopsAnyAvg = loadJobStat(&jobMeta, "flops_any") + job.MemBwAvg = loadJobStat(&jobMeta, "mem_bw") + job.NetBwAvg = loadJobStat(&jobMeta, "net_bw") + job.FileBwAvg = loadJobStat(&jobMeta, "file_bw") + job.RawResources, err = json.Marshal(job.Resources) + if err != nil { + log.Warn("Error while marshaling job resources") + return err + } + job.RawMetaData, err = json.Marshal(job.MetaData) + if err != nil { + log.Warn("Error while marshaling job metadata") + return err + } + + if err := SanityChecks(&job.BaseJob); err != nil { + log.Warn("BaseJob SanityChecks failed") + return err + } + + if err := archive.GetHandle().ImportJob(&jobMeta, &jobData); err != nil { + log.Error("Error while importing job") + return err + } + + id, err := r.InsertJob(&job) + if err != nil { + log.Warn("Error while job db insert") + return err + } + + for _, tag := range job.Tags { + if _, err := r.AddTagOrCreate(id, tag.Type, tag.Name); err != nil { + log.Error("Error while adding or creating tag") + return err + } + } + + log.Infof("successfully imported a new job (jobId: %d, cluster: %s, dbid: %d)", job.JobID, job.Cluster, id) + } + return nil +} diff --git a/internal/repository_test/setup.go b/internal/importer/importer_test.go similarity index 66% rename from internal/repository_test/setup.go rename to internal/importer/importer_test.go index 209dcc3..e31bab8 100644 --- a/internal/repository_test/setup.go +++ b/internal/importer/importer_test.go @@ -2,16 +2,18 @@ // All rights reserved. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. -package repository_test +package importer_test import ( "encoding/json" "fmt" "os" "path/filepath" + "strings" "testing" "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/importer" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" @@ -110,3 +112,62 @@ func setupRepo(t *testing.T) *repository.JobRepository { repository.Connect("sqlite3", dbfilepath) return repository.GetJobRepository() } + +type Result struct { + JobId int64 + Cluster string + StartTime int64 + Duration int32 +} + +func readResult(t *testing.T, testname string) Result { + var r Result + + content, err := os.ReadFile(filepath.Join("testdata", + fmt.Sprintf("%s-golden.json", testname))) + if err != nil { + t.Fatal("Error when opening file: ", err) + } + + err = json.Unmarshal(content, &r) + if err != nil { + t.Fatal("Error during Unmarshal(): ", err) + } + + return r +} + +func TestHandleImportFlag(t *testing.T) { + r := setupRepo(t) + + tests, err := filepath.Glob(filepath.Join("testdata", "*.input")) + if err != nil { + t.Fatal(err) + } + + for _, path := range tests { + _, filename := filepath.Split(path) + str := strings.Split(strings.TrimSuffix(filename, ".input"), "-") + testname := str[1] + + t.Run(testname, func(t *testing.T) { + s := fmt.Sprintf("%s:%s", filepath.Join("testdata", + fmt.Sprintf("meta-%s.input", testname)), + filepath.Join("testdata", fmt.Sprintf("data-%s.json", testname))) + err := importer.HandleImportFlag(s) + if err != nil { + t.Fatal(err) + } + + result := readResult(t, testname) + job, err := r.Find(&result.JobId, &result.Cluster, &result.StartTime) + if err != nil { + t.Fatal(err) + } + if job.Duration != result.Duration { + t.Errorf("wrong duration for job\ngot: %d \nwant: %d", job.Duration, result.Duration) + } + + }) + } +} diff --git a/internal/importer/initDB.go b/internal/importer/initDB.go new file mode 100644 index 0000000..0bfd13b --- /dev/null +++ b/internal/importer/initDB.go @@ -0,0 +1,225 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package importer + +import ( + "encoding/json" + "fmt" + "math" + "strings" + "time" + + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" + "github.com/ClusterCockpit/cc-backend/pkg/units" +) + +// Delete the tables "job", "tag" and "jobtag" from the database and +// repopulate them using the jobs found in `archive`. +func InitDB() error { + r := repository.GetJobRepository() + starttime := time.Now() + log.Print("Building job table...") + + t, err := r.TransactionInit() + if err != nil { + log.Warn("Error while initializing SQL transactions") + return err + } + tags := make(map[string]int64) + + // Not using log.Print because we want the line to end with `\r` and + // this function is only ever called when a special command line flag + // is passed anyways. + fmt.Printf("%d jobs inserted...\r", 0) + + ar := archive.GetHandle() + i := 0 + errorOccured := 0 + + for jobContainer := range ar.Iter(false) { + + jobMeta := jobContainer.Meta + + // Bundle 100 inserts into one transaction for better performance + if i%100 == 0 { + r.TransactionCommit(t) + fmt.Printf("%d jobs inserted...\r", i) + } + + jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful + job := schema.Job{ + BaseJob: jobMeta.BaseJob, + StartTime: time.Unix(jobMeta.StartTime, 0), + StartTimeUnix: jobMeta.StartTime, + } + + // TODO: Other metrics... + job.FlopsAnyAvg = loadJobStat(jobMeta, "flops_any") + job.MemBwAvg = loadJobStat(jobMeta, "mem_bw") + job.NetBwAvg = loadJobStat(jobMeta, "net_bw") + job.FileBwAvg = loadJobStat(jobMeta, "file_bw") + + job.RawResources, err = json.Marshal(job.Resources) + if err != nil { + log.Errorf("repository initDB(): %v", err) + errorOccured++ + continue + } + + job.RawMetaData, err = json.Marshal(job.MetaData) + if err != nil { + log.Errorf("repository initDB(): %v", err) + errorOccured++ + continue + } + + if err := SanityChecks(&job.BaseJob); err != nil { + log.Errorf("repository initDB(): %v", err) + errorOccured++ + continue + } + + id, err := r.TransactionAdd(t, job) + if err != nil { + log.Errorf("repository initDB(): %v", err) + errorOccured++ + continue + } + + for _, tag := range job.Tags { + tagstr := tag.Name + ":" + tag.Type + tagId, ok := tags[tagstr] + if !ok { + tagId, err = r.TransactionAddTag(t, tag) + if err != nil { + log.Errorf("Error adding tag: %v", err) + errorOccured++ + continue + } + tags[tagstr] = tagId + } + + r.TransactionSetTag(t, id, tagId) + } + + if err == nil { + i += 1 + } + } + + if errorOccured > 0 { + log.Warnf("Error in import of %d jobs!", errorOccured) + } + + r.TransactionEnd(t) + log.Printf("A total of %d jobs have been registered in %.3f seconds.\n", i, time.Since(starttime).Seconds()) + return nil +} + +// This function also sets the subcluster if necessary! +func SanityChecks(job *schema.BaseJob) error { + if c := archive.GetCluster(job.Cluster); c == nil { + return fmt.Errorf("no such cluster: %v", job.Cluster) + } + if err := archive.AssignSubCluster(job); err != nil { + log.Warn("Error while assigning subcluster to job") + return err + } + if !job.State.Valid() { + return fmt.Errorf("not a valid job state: %v", job.State) + } + if len(job.Resources) == 0 || len(job.User) == 0 { + return fmt.Errorf("'resources' and 'user' should not be empty") + } + if *job.NumAcc < 0 || *job.NumHWThreads < 0 || job.NumNodes < 1 { + return fmt.Errorf("'numNodes', 'numAcc' or 'numHWThreads' invalid") + } + if len(job.Resources) != int(job.NumNodes) { + return fmt.Errorf("len(resources) does not equal numNodes (%d vs %d)", len(job.Resources), job.NumNodes) + } + + return nil +} + +func loadJobStat(job *schema.JobMeta, metric string) float64 { + if stats, ok := job.Statistics[metric]; ok { + return stats.Avg + } + + return 0.0 +} + +func getNormalizationFactor(v float64) (float64, int) { + count := 0 + scale := -3 + + if v > 1000.0 { + for v > 1000.0 { + v *= 1e-3 + count++ + } + } else { + for v < 1.0 { + v *= 1e3 + count++ + } + scale = 3 + } + return math.Pow10(count * scale), count * scale +} + +func normalize(avg float64, p string) (float64, string) { + f, e := getNormalizationFactor(avg) + + if e != 0 { + np := units.NewPrefixFromFactor(units.NewPrefix(p), e) + return f, np.Prefix() + } + + return f, p +} + +func checkJobData(d *schema.JobData) error { + for _, scopes := range *d { + // var newUnit schema.Unit + // TODO Add node scope if missing + for _, metric := range scopes { + if strings.Contains(metric.Unit.Base, "B/s") || + strings.Contains(metric.Unit.Base, "F/s") || + strings.Contains(metric.Unit.Base, "B") { + + // get overall avg + sum := 0.0 + for _, s := range metric.Series { + sum += s.Statistics.Avg + } + + avg := sum / float64(len(metric.Series)) + f, p := normalize(avg, metric.Unit.Prefix) + + if p != metric.Unit.Prefix { + + fmt.Printf("Convert %e", f) + // for _, s := range metric.Series { + // fp := schema.ConvertFloatToFloat64(s.Data) + // + // for i := 0; i < len(fp); i++ { + // fp[i] *= f + // fp[i] = math.Ceil(fp[i]) + // } + // + // s.Data = schema.GetFloat64ToFloat(fp) + // } + + metric.Unit.Prefix = p + } + } + } + } + return nil +} diff --git a/internal/importer/initDB_test.go b/internal/importer/initDB_test.go new file mode 100644 index 0000000..64016a4 --- /dev/null +++ b/internal/importer/initDB_test.go @@ -0,0 +1,64 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package importer + +import ( + "fmt" + "testing" + + "github.com/ClusterCockpit/cc-backend/pkg/units" +) + +func TestNormalizeFactor(t *testing.T) { + // var us string + s := []float64{2890031237, 23998994567, 389734042344, 390349424345} + // r := []float64{3, 24, 390, 391} + + total := 0.0 + for _, number := range s { + total += number + } + avg := total / float64(len(s)) + + fmt.Printf("AVG: %e\n", avg) + f, e := getNormalizationFactor(avg) + + fmt.Printf("Factor %e Count %d\n", f, e) + + np := units.NewPrefix("") + + fmt.Printf("Prefix %e Short %s\n", float64(np), np.Prefix()) + + p := units.NewPrefixFromFactor(np, e) + + if p.Prefix() != "G" { + t.Errorf("Failed Prefix or unit: Want G, Got %s", p.Prefix()) + } +} + +func TestNormalizeKeep(t *testing.T) { + s := []float64{3.0, 24.0, 390.0, 391.0} + + total := 0.0 + for _, number := range s { + total += number + } + avg := total / float64(len(s)) + + fmt.Printf("AVG: %e\n", avg) + f, e := getNormalizationFactor(avg) + + fmt.Printf("Factor %e Count %d\n", f, e) + + np := units.NewPrefix("G") + + fmt.Printf("Prefix %e Short %s\n", float64(np), np.Prefix()) + + p := units.NewPrefixFromFactor(np, e) + + if p.Prefix() != "G" { + t.Errorf("Failed Prefix or unit: Want G, Got %s", p.Prefix()) + } +} diff --git a/internal/repository_test/testdata/cluster-fritz.json b/internal/importer/testdata/cluster-fritz.json similarity index 100% rename from internal/repository_test/testdata/cluster-fritz.json rename to internal/importer/testdata/cluster-fritz.json diff --git a/internal/repository_test/testdata/data-fritzError.json b/internal/importer/testdata/data-fritzError.json similarity index 100% rename from internal/repository_test/testdata/data-fritzError.json rename to internal/importer/testdata/data-fritzError.json diff --git a/internal/repository_test/testdata/data-fritzMinimal.json b/internal/importer/testdata/data-fritzMinimal.json similarity index 100% rename from internal/repository_test/testdata/data-fritzMinimal.json rename to internal/importer/testdata/data-fritzMinimal.json diff --git a/internal/repository_test/testdata/data-taurus.json b/internal/importer/testdata/data-taurus.json similarity index 100% rename from internal/repository_test/testdata/data-taurus.json rename to internal/importer/testdata/data-taurus.json diff --git a/internal/repository_test/testdata/meta-fritzError.input b/internal/importer/testdata/meta-fritzError.input similarity index 100% rename from internal/repository_test/testdata/meta-fritzError.input rename to internal/importer/testdata/meta-fritzError.input diff --git a/internal/repository_test/testdata/meta-fritzMinimal.input b/internal/importer/testdata/meta-fritzMinimal.input similarity index 100% rename from internal/repository_test/testdata/meta-fritzMinimal.input rename to internal/importer/testdata/meta-fritzMinimal.input diff --git a/internal/repository_test/testdata/meta-taurus.input b/internal/importer/testdata/meta-taurus.input similarity index 100% rename from internal/repository_test/testdata/meta-taurus.input rename to internal/importer/testdata/meta-taurus.input diff --git a/internal/repository_test/testdata/taurus-golden.json b/internal/importer/testdata/taurus-golden.json similarity index 100% rename from internal/repository_test/testdata/taurus-golden.json rename to internal/importer/testdata/taurus-golden.json diff --git a/internal/repository/init.go b/internal/repository/init.go deleted file mode 100644 index 80e7fa1..0000000 --- a/internal/repository/init.go +++ /dev/null @@ -1,391 +0,0 @@ -// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package repository - -import ( - "bytes" - "database/sql" - "encoding/json" - "fmt" - "math" - "os" - "strings" - "time" - - "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/pkg/archive" - "github.com/ClusterCockpit/cc-backend/pkg/log" - "github.com/ClusterCockpit/cc-backend/pkg/schema" - "github.com/ClusterCockpit/cc-backend/pkg/units" -) - -const NamedJobInsert string = `INSERT INTO job ( - job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc, - exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data, - mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total -) VALUES ( - :job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, - :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data, - :mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total -);` - -// Import all jobs specified as `:,...` -func HandleImportFlag(flag string) error { - for _, pair := range strings.Split(flag, ",") { - files := strings.Split(pair, ":") - if len(files) != 2 { - return fmt.Errorf("REPOSITORY/INIT > invalid import flag format") - } - - raw, err := os.ReadFile(files[0]) - if err != nil { - log.Warn("Error while reading metadata file for import") - return err - } - - if config.Keys.Validate { - if err := schema.Validate(schema.Meta, bytes.NewReader(raw)); err != nil { - return fmt.Errorf("REPOSITORY/INIT > validate job meta: %v", err) - } - } - dec := json.NewDecoder(bytes.NewReader(raw)) - dec.DisallowUnknownFields() - jobMeta := schema.JobMeta{BaseJob: schema.JobDefaults} - if err := dec.Decode(&jobMeta); err != nil { - log.Warn("Error while decoding raw json metadata for import") - return err - } - - raw, err = os.ReadFile(files[1]) - if err != nil { - log.Warn("Error while reading jobdata file for import") - return err - } - - if config.Keys.Validate { - if err := schema.Validate(schema.Data, bytes.NewReader(raw)); err != nil { - return fmt.Errorf("REPOSITORY/INIT > validate job data: %v", err) - } - } - dec = json.NewDecoder(bytes.NewReader(raw)) - dec.DisallowUnknownFields() - jobData := schema.JobData{} - if err := dec.Decode(&jobData); err != nil { - log.Warn("Error while decoding raw json jobdata for import") - return err - } - - //checkJobData(&jobData) - // SanityChecks(&jobMeta.BaseJob) - jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful - if job, err := GetJobRepository().Find(&jobMeta.JobID, &jobMeta.Cluster, &jobMeta.StartTime); err != sql.ErrNoRows { - if err != nil { - log.Warn("Error while finding job in jobRepository") - return err - } - - return fmt.Errorf("REPOSITORY/INIT > a job with that jobId, cluster and startTime does already exist (dbid: %d)", job.ID) - } - - job := schema.Job{ - BaseJob: jobMeta.BaseJob, - StartTime: time.Unix(jobMeta.StartTime, 0), - StartTimeUnix: jobMeta.StartTime, - } - - // TODO: Other metrics... - job.FlopsAnyAvg = loadJobStat(&jobMeta, "flops_any") - job.MemBwAvg = loadJobStat(&jobMeta, "mem_bw") - job.NetBwAvg = loadJobStat(&jobMeta, "net_bw") - job.FileBwAvg = loadJobStat(&jobMeta, "file_bw") - job.RawResources, err = json.Marshal(job.Resources) - if err != nil { - log.Warn("Error while marshaling job resources") - return err - } - job.RawMetaData, err = json.Marshal(job.MetaData) - if err != nil { - log.Warn("Error while marshaling job metadata") - return err - } - - if err := SanityChecks(&job.BaseJob); err != nil { - log.Warn("BaseJob SanityChecks failed") - return err - } - - if err := archive.GetHandle().ImportJob(&jobMeta, &jobData); err != nil { - log.Error("Error while importing job") - return err - } - - res, err := GetConnection().DB.NamedExec(NamedJobInsert, job) - if err != nil { - log.Warn("Error while NamedJobInsert") - return err - } - - id, err := res.LastInsertId() - if err != nil { - log.Warn("Error while getting last insert ID") - return err - } - - for _, tag := range job.Tags { - if _, err := GetJobRepository().AddTagOrCreate(id, tag.Type, tag.Name); err != nil { - log.Error("Error while adding or creating tag") - return err - } - } - - log.Infof("successfully imported a new job (jobId: %d, cluster: %s, dbid: %d)", job.JobID, job.Cluster, id) - } - return nil -} - -// Delete the tables "job", "tag" and "jobtag" from the database and -// repopulate them using the jobs found in `archive`. -func InitDB() error { - db := GetConnection() - starttime := time.Now() - log.Print("Building job table...") - - // Inserts are bundled into transactions because in sqlite, - // that speeds up inserts A LOT. - tx, err := db.DB.Beginx() - if err != nil { - log.Warn("Error while bundling transactions") - return err - } - - stmt, err := tx.PrepareNamed(NamedJobInsert) - if err != nil { - log.Warn("Error while preparing namedJobInsert") - return err - } - tags := make(map[string]int64) - - // Not using log.Print because we want the line to end with `\r` and - // this function is only ever called when a special command line flag - // is passed anyways. - fmt.Printf("%d jobs inserted...\r", 0) - - ar := archive.GetHandle() - i := 0 - errorOccured := 0 - - for jobContainer := range ar.Iter(false) { - - jobMeta := jobContainer.Meta - - // // Bundle 100 inserts into one transaction for better performance: - if i%10 == 0 { - if tx != nil { - if err := tx.Commit(); err != nil { - log.Warn("Error while committing transactions for jobMeta") - return err - } - } - - tx, err = db.DB.Beginx() - if err != nil { - log.Warn("Error while bundling transactions for jobMeta") - return err - } - - stmt = tx.NamedStmt(stmt) - fmt.Printf("%d jobs inserted...\r", i) - } - - jobMeta.MonitoringStatus = schema.MonitoringStatusArchivingSuccessful - job := schema.Job{ - BaseJob: jobMeta.BaseJob, - StartTime: time.Unix(jobMeta.StartTime, 0), - StartTimeUnix: jobMeta.StartTime, - } - - // TODO: Other metrics... - job.FlopsAnyAvg = loadJobStat(jobMeta, "flops_any") - job.MemBwAvg = loadJobStat(jobMeta, "mem_bw") - job.NetBwAvg = loadJobStat(jobMeta, "net_bw") - job.FileBwAvg = loadJobStat(jobMeta, "file_bw") - - job.RawResources, err = json.Marshal(job.Resources) - if err != nil { - log.Errorf("repository initDB(): %v", err) - errorOccured++ - continue - } - - job.RawMetaData, err = json.Marshal(job.MetaData) - if err != nil { - log.Errorf("repository initDB(): %v", err) - errorOccured++ - continue - } - - if err := SanityChecks(&job.BaseJob); err != nil { - log.Errorf("repository initDB(): %v", err) - errorOccured++ - continue - } - - res, err := stmt.Exec(job) - if err != nil { - log.Errorf("repository initDB(): %v", err) - errorOccured++ - continue - } - - id, err := res.LastInsertId() - if err != nil { - log.Errorf("repository initDB(): %v", err) - errorOccured++ - continue - } - - for _, tag := range job.Tags { - tagstr := tag.Name + ":" + tag.Type - tagId, ok := tags[tagstr] - if !ok { - res, err := tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type) - if err != nil { - log.Errorf("Error while inserting tag into tag table: %v (Type %v)", tag.Name, tag.Type) - return err - } - tagId, err = res.LastInsertId() - if err != nil { - log.Warn("Error while getting last insert ID") - return err - } - tags[tagstr] = tagId - } - - if _, err := tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, id, tagId); err != nil { - log.Errorf("Error while inserting jobtag into jobtag table: %v (TagID %v)", id, tagId) - return err - } - } - - if err == nil { - i += 1 - } - } - - if errorOccured > 0 { - log.Warnf("Error in import of %d jobs!", errorOccured) - } - - if err := tx.Commit(); err != nil { - log.Warn("Error while committing SQL transactions") - return err - } - - log.Printf("A total of %d jobs have been registered in %.3f seconds.\n", i, time.Since(starttime).Seconds()) - return nil -} - -// This function also sets the subcluster if necessary! -func SanityChecks(job *schema.BaseJob) error { - if c := archive.GetCluster(job.Cluster); c == nil { - return fmt.Errorf("no such cluster: %v", job.Cluster) - } - if err := archive.AssignSubCluster(job); err != nil { - log.Warn("Error while assigning subcluster to job") - return err - } - if !job.State.Valid() { - return fmt.Errorf("not a valid job state: %v", job.State) - } - if len(job.Resources) == 0 || len(job.User) == 0 { - return fmt.Errorf("'resources' and 'user' should not be empty") - } - if *job.NumAcc < 0 || *job.NumHWThreads < 0 || job.NumNodes < 1 { - return fmt.Errorf("'numNodes', 'numAcc' or 'numHWThreads' invalid") - } - if len(job.Resources) != int(job.NumNodes) { - return fmt.Errorf("len(resources) does not equal numNodes (%d vs %d)", len(job.Resources), job.NumNodes) - } - - return nil -} - -func loadJobStat(job *schema.JobMeta, metric string) float64 { - if stats, ok := job.Statistics[metric]; ok { - return stats.Avg - } - - return 0.0 -} - -func getNormalizationFactor(v float64) (float64, int) { - count := 0 - scale := -3 - - if v > 1000.0 { - for v > 1000.0 { - v *= 1e-3 - count++ - } - } else { - for v < 1.0 { - v *= 1e3 - count++ - } - scale = 3 - } - return math.Pow10(count * scale), count * scale -} - -func normalize(avg float64, p string) (float64, string) { - f, e := getNormalizationFactor(avg) - - if e != 0 { - np := units.NewPrefixFromFactor(units.NewPrefix(p), e) - return f, np.Prefix() - } - - return f, p -} - -func checkJobData(d *schema.JobData) error { - for _, scopes := range *d { - // var newUnit schema.Unit - // TODO Add node scope if missing - for _, metric := range scopes { - if strings.Contains(metric.Unit.Base, "B/s") || - strings.Contains(metric.Unit.Base, "F/s") || - strings.Contains(metric.Unit.Base, "B") { - - // get overall avg - sum := 0.0 - for _, s := range metric.Series { - sum += s.Statistics.Avg - } - - avg := sum / float64(len(metric.Series)) - f, p := normalize(avg, metric.Unit.Prefix) - - if p != metric.Unit.Prefix { - - fmt.Printf("Convert %e", f) - // for _, s := range metric.Series { - // fp := schema.ConvertFloatToFloat64(s.Data) - // - // for i := 0; i < len(fp); i++ { - // fp[i] *= f - // fp[i] = math.Ceil(fp[i]) - // } - // - // s.Data = schema.GetFloat64ToFloat(fp) - // } - - metric.Unit.Prefix = p - } - } - } - } - return nil -} diff --git a/internal/repository/init_test.go b/internal/repository/init_test.go index 332a137..4f5930c 100644 --- a/internal/repository/init_test.go +++ b/internal/repository/init_test.go @@ -6,26 +6,11 @@ package repository import ( "fmt" - "path/filepath" "testing" - "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/units" - _ "github.com/mattn/go-sqlite3" ) -func setupRepo(t *testing.T) *JobRepository { - log.Init("info", true) - tmpdir := t.TempDir() - dbfilepath := filepath.Join(tmpdir, "test.db") - err := MigrateDB("sqlite3", dbfilepath) - if err != nil { - t.Fatal(err) - } - Connect("sqlite3", dbfilepath) - return GetJobRepository() -} - func TestNormalizeFactor(t *testing.T) { // var us string s := []float64{2890031237, 23998994567, 389734042344, 390349424345} diff --git a/internal/repository/job.go b/internal/repository/job.go index 81a19ec..6c504ef 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -913,3 +913,121 @@ func (r *JobRepository) jobsStatisticsHistogram(ctx context.Context, log.Infof("Timer jobsStatisticsHistogram %s", time.Since(start)) return points, nil } + +const NamedJobInsert string = `INSERT INTO job ( + job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc, + exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data, + mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total +) VALUES ( + :job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, + :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data, + :mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total +);` + +func (r *JobRepository) InsertJob(job *schema.Job) (int64, error) { + res, err := r.DB.NamedExec(NamedJobInsert, job) + if err != nil { + log.Warn("Error while NamedJobInsert") + return 0, err + } + id, err := res.LastInsertId() + if err != nil { + log.Warn("Error while getting last insert ID") + return 0, err + } + + return id, nil +} + +type Transaction struct { + tx *sqlx.Tx + stmt *sqlx.NamedStmt +} + +func (r *JobRepository) TransactionInit() (*Transaction, error) { + var err error + t := new(Transaction) + // Inserts are bundled into transactions because in sqlite, + // that speeds up inserts A LOT. + t.tx, err = r.DB.Beginx() + if err != nil { + log.Warn("Error while bundling transactions") + return nil, err + } + + t.stmt, err = t.tx.PrepareNamed(NamedJobInsert) + if err != nil { + log.Warn("Error while preparing namedJobInsert") + return nil, err + } + + return t, nil +} + +func (r *JobRepository) TransactionCommit(t *Transaction) error { + var err error + if t.tx != nil { + if err = t.tx.Commit(); err != nil { + log.Warn("Error while committing transactions") + return err + } + } + + t.tx, err = r.DB.Beginx() + if err != nil { + log.Warn("Error while bundling transactions") + return err + } + + t.stmt = t.tx.NamedStmt(t.stmt) + return nil +} + +func (r *JobRepository) TransactionEnd(t *Transaction) error { + if err := t.tx.Commit(); err != nil { + log.Warn("Error while committing SQL transactions") + return err + } + + return nil +} + +func (r *JobRepository) TransactionAdd(t *Transaction, job schema.Job) (int64, error) { + res, err := t.stmt.Exec(job) + if err != nil { + log.Errorf("repository initDB(): %v", err) + return 0, err + } + + id, err := res.LastInsertId() + if err != nil { + log.Errorf("repository initDB(): %v", err) + return 0, err + } + + return id, nil +} + +func (r *JobRepository) TransactionAddTag(t *Transaction, tag *schema.Tag) (int64, error) { + res, err := t.tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type) + if err != nil { + log.Errorf("Error while inserting tag into tag table: %v (Type %v)", tag.Name, tag.Type) + return 0, err + } + tagId, err := res.LastInsertId() + if err != nil { + log.Warn("Error while getting last insert ID") + return 0, err + } + + return tagId, nil +} + +func (r *JobRepository) TransactionSetTag(t *Transaction, jobId int64, tagId int64) error { + if _, err := t.tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, jobId, tagId); err != nil { + log.Errorf("Error while inserting jobtag into jobtag table: %v (TagID %v)", jobId, tagId) + return err + } + + return nil +} diff --git a/internal/repository_test/importFlag.go b/internal/repository_test/importFlag.go deleted file mode 100644 index 2b5d737..0000000 --- a/internal/repository_test/importFlag.go +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package repository_test - -import ( - "bytes" - "go/format" - "os" - "path/filepath" - "testing" -) - -func TestHandleImportFlag(t *testing.T) { - r := setupRepo(t) - - tests, err := filepath.Glob(filepath.Join("testdata", "*.input")) - if err != nil { - t.Fatal(err) - } - - for _, path := range tests { - _, filename := filepath.Split(path) - testname := filename[:len(filename)-len(filepath.Ext(path))] - - t.Run(testname, func(t *testing.T) { - source, err := os.ReadFile(path) - if err != nil { - t.Fatal("error reading source file:", err) - } - - // >>> This is the actual code under test. - output, err := format.Source(source) - if err != nil { - t.Fatal("error formatting:", err) - } - // <<< - - // Each input file is expected to have a "golden output" file, with the - // same path except the .input extension is replaced by .golden - goldenfile := filepath.Join("testdata", testname+".golden") - want, err := os.ReadFile(goldenfile) - if err != nil { - t.Fatal("error reading golden file:", err) - } - - if !bytes.Equal(output, want) { - t.Errorf("\n==== got:\n%s\n==== want:\n%s\n", output, want) - } - }) - } - - s := "../../test/repo/meta1.json:../../test/repo/data1.json" - err := HandleImportFlag(s) - if err != nil { - t.Fatal(err) - } - - jobId, cluster, startTime := int64(398764), "fritz", int64(1675954353) - job, err := r.Find(&jobId, &cluster, &startTime) - if err != nil { - t.Fatal(err) - } - - if job.ID != 2 { - t.Errorf("wrong summary for diagnostic 3\ngot: %d \nwant: 1366", job.JobID) - } -} diff --git a/internal/repository_test/importFlag_test.go b/internal/repository_test/importFlag_test.go deleted file mode 100644 index 3767ea3..0000000 --- a/internal/repository_test/importFlag_test.go +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. -// All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. -package repository_test - -import ( - "encoding/json" - "fmt" - "os" - "path/filepath" - "strings" - "testing" - - "github.com/ClusterCockpit/cc-backend/internal/repository" -) - -type Result struct { - JobId int64 - Cluster string - StartTime int64 - Duration int32 -} - -func readResult(t *testing.T, testname string) Result { - var r Result - - content, err := os.ReadFile(filepath.Join("testdata", - fmt.Sprintf("%s-golden.json", testname))) - if err != nil { - t.Fatal("Error when opening file: ", err) - } - - err = json.Unmarshal(content, &r) - if err != nil { - t.Fatal("Error during Unmarshal(): ", err) - } - - return r -} - -func TestHandleImportFlag(t *testing.T) { - r := setupRepo(t) - - tests, err := filepath.Glob(filepath.Join("testdata", "*.input")) - if err != nil { - t.Fatal(err) - } - - for _, path := range tests { - _, filename := filepath.Split(path) - str := strings.Split(strings.TrimSuffix(filename, ".input"), "-") - testname := str[1] - - t.Run(testname, func(t *testing.T) { - s := fmt.Sprintf("%s:%s", filepath.Join("testdata", - fmt.Sprintf("meta-%s.input", testname)), - filepath.Join("testdata", fmt.Sprintf("data-%s.json", testname))) - err := repository.HandleImportFlag(s) - if err != nil { - t.Fatal(err) - } - - result := readResult(t, testname) - job, err := r.Find(&result.JobId, &result.Cluster, &result.StartTime) - if err != nil { - t.Fatal(err) - } - if job.Duration != result.Duration { - t.Errorf("wrong duration for job\ngot: %d \nwant: %d", job.Duration, result.Duration) - } - - }) - } -}