Synchronize DB Schema with json schema

Rework tests
This commit is contained in:
Jan Eitzinger
2023-04-21 12:59:27 +02:00
parent a8980c7a5e
commit 500ae29d25
16 changed files with 352 additions and 80 deletions

View File

@@ -56,7 +56,10 @@ func Connect(driver string, db string) {
}
dbConnInstance = &DBConnection{DB: dbHandle, Driver: driver}
checkDBVersion(driver, dbHandle.DB)
err = checkDBVersion(driver, dbHandle.DB)
if err != nil {
log.Fatal(err)
}
})
}

View File

@@ -9,6 +9,7 @@ import (
"database/sql"
"encoding/json"
"fmt"
"math"
"os"
"strings"
"time"
@@ -319,31 +320,71 @@ func loadJobStat(job *schema.JobMeta, metric string) float64 {
return 0.0
}
func getNormalizationFactor(v float64) (float64, int) {
count := 0
scale := -3
if v > 1000.0 {
for v > 1000.0 {
v *= 1e-3
count++
}
} else {
for v < 1.0 {
v *= 1e3
count++
}
scale = 3
}
return math.Pow10(count * scale), count * scale
}
func normalize(avg float64, u schema.Unit) (float64, schema.Unit) {
f, e := getNormalizationFactor(avg)
if e != 0 {
p := units.NewPrefixFromFactor(units.NewPrefix(*u.Prefix), e)
np := p.Prefix()
return f, schema.Unit{Prefix: &np, Base: u.Base}
}
return f, u
}
func checkJobData(d *schema.JobData) error {
for _, scopes := range *d {
var newUnit string
// Add node scope if missing
// var newUnit schema.Unit
// TODO Add node scope if missing
for _, metric := range scopes {
if strings.Contains(metric.Unit.Base, "B/s") ||
strings.Contains(metric.Unit.Base, "F/s") ||
strings.Contains(metric.Unit.Base, "B") {
// First get overall avg
// get overall avg
sum := 0.0
for _, s := range metric.Series {
sum += s.Statistics.Avg
}
avg := sum / float64(len(metric.Series))
f, u := normalize(avg, metric.Unit)
for _, s := range metric.Series {
fp := schema.ConvertFloatToFloat64(s.Data)
// Normalize values with new unit prefix
oldUnit := metric.Unit.Base
units.NormalizeSeries(fp, avg, oldUnit, &newUnit)
s.Data = schema.GetFloat64ToFloat(fp)
if u.Prefix != metric.Unit.Prefix {
for _, s := range metric.Series {
fp := schema.ConvertFloatToFloat64(s.Data)
for i := 0; i < len(fp); i++ {
fp[i] *= f
fp[i] = math.Ceil(fp[i])
}
s.Data = schema.GetFloat64ToFloat(fp)
}
metric.Unit.Prefix = u.Prefix
}
metric.Unit.Base = newUnit
}
}
}

View File

@@ -0,0 +1,101 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"fmt"
"path/filepath"
"testing"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/units"
_ "github.com/mattn/go-sqlite3"
)
func setupRepo(t *testing.T) *JobRepository {
log.Init("info", true)
tmpdir := t.TempDir()
dbfilepath := filepath.Join(tmpdir, "test.db")
err := MigrateDB("sqlite3", dbfilepath)
if err != nil {
t.Fatal(err)
}
Connect("sqlite3", dbfilepath)
return GetJobRepository()
}
func TestNormalizeFactor(t *testing.T) {
// var us string
s := []float64{2890031237, 23998994567, 389734042344, 390349424345}
// r := []float64{3, 24, 390, 391}
total := 0.0
for _, number := range s {
total += number
}
avg := total / float64(len(s))
fmt.Printf("AVG: %e\n", avg)
f, e := getNormalizationFactor(avg)
fmt.Printf("Factor %e Count %d\n", f, e)
np := units.NewPrefix("")
fmt.Printf("Prefix %e Short %s\n", float64(np), np.Prefix())
p := units.NewPrefixFromFactor(np, e)
if p.Prefix() != "G" {
t.Errorf("Failed Prefix or unit: Want G, Got %s", p.Prefix())
}
}
func TestNormalizeKeep(t *testing.T) {
s := []float64{3.0, 24.0, 390.0, 391.0}
total := 0.0
for _, number := range s {
total += number
}
avg := total / float64(len(s))
fmt.Printf("AVG: %e\n", avg)
f, e := getNormalizationFactor(avg)
fmt.Printf("Factor %e Count %d\n", f, e)
np := units.NewPrefix("G")
fmt.Printf("Prefix %e Short %s\n", float64(np), np.Prefix())
p := units.NewPrefixFromFactor(np, e)
if p.Prefix() != "G" {
t.Errorf("Failed Prefix or unit: Want G, Got %s", p.Prefix())
}
}
//
// func TestHandleImportFlag(t *testing.T) {
// t.Error("wrong summary for diagnostic ")
// r := setupRepo(t)
//
// s := "../../test/repo/meta1.json:../../test/repo/data1.json"
// err := HandleImportFlag(s)
// if err != nil {
// t.Fatal(err)
// }
//
// jobId, cluster, startTime := int64(1404396), "emmy", int64(1609299584)
// job, err := r.Find(&jobId, &cluster, &startTime)
// if err != nil {
// t.Fatal(err)
// }
//
// if job.ID != 1366 {
// t.Errorf("wrong summary for diagnostic 3\ngot: %d \nwant: 1366", job.JobID)
// }
// }

View File

@@ -12,19 +12,21 @@ import (
_ "github.com/mattn/go-sqlite3"
)
func init() {
log.Init("info", true)
Connect("sqlite3", "../../test/test.db")
}
func setup(t *testing.T) *JobRepository {
log.Init("info", true)
dbfilepath := "../../test/test.db"
err := MigrateDB("sqlite3", dbfilepath)
if err != nil {
t.Fatal(err)
}
Connect("sqlite3", dbfilepath)
return GetJobRepository()
}
func TestFind(t *testing.T) {
r := setup(t)
jobId, cluster, startTime := int64(1404396), "emmy", int64(1609299584)
jobId, cluster, startTime := int64(398998), "fritz", int64(1675957496)
job, err := r.Find(&jobId, &cluster, &startTime)
if err != nil {
t.Fatal(err)
@@ -32,7 +34,7 @@ func TestFind(t *testing.T) {
// fmt.Printf("%+v", job)
if job.ID != 1366 {
if job.ID != 5 {
t.Errorf("wrong summary for diagnostic 3\ngot: %d \nwant: 1366", job.JobID)
}
}
@@ -40,14 +42,14 @@ func TestFind(t *testing.T) {
func TestFindById(t *testing.T) {
r := setup(t)
job, err := r.FindById(1366)
job, err := r.FindById(5)
if err != nil {
t.Fatal(err)
}
// fmt.Printf("%+v", job)
if job.JobID != 1404396 {
if job.JobID != 398998 {
t.Errorf("wrong summary for diagnostic 3\ngot: %d \nwant: 1404396", job.JobID)
}
}
@@ -63,7 +65,7 @@ func TestGetTags(t *testing.T) {
fmt.Printf("TAGS %+v \n", tags)
// fmt.Printf("COUNTS %+v \n", counts)
if counts["bandwidth"] != 6 {
t.Errorf("wrong summary for diagnostic 3\ngot: %d \nwant: 6", counts["load-imbalance"])
if counts["bandwidth"] != 3 {
t.Errorf("wrong tag count \ngot: %d \nwant: 3", counts["bandwidth"])
}
}

View File

@@ -8,7 +8,6 @@ import (
"database/sql"
"embed"
"fmt"
"os"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/golang-migrate/migrate/v4"
@@ -22,37 +21,37 @@ const Version uint = 3
//go:embed migrations/*
var migrationFiles embed.FS
func checkDBVersion(backend string, db *sql.DB) {
func checkDBVersion(backend string, db *sql.DB) error {
var m *migrate.Migrate
if backend == "sqlite3" {
driver, err := sqlite3.WithInstance(db, &sqlite3.Config{})
if err != nil {
log.Fatal(err)
return err
}
d, err := iofs.New(migrationFiles, "migrations/sqlite3")
if err != nil {
log.Fatal(err)
return err
}
m, err = migrate.NewWithInstance("iofs", d, "sqlite3", driver)
if err != nil {
log.Fatal(err)
return err
}
} else if backend == "mysql" {
driver, err := mysql.WithInstance(db, &mysql.Config{})
if err != nil {
log.Fatal(err)
return err
}
d, err := iofs.New(migrationFiles, "migrations/mysql")
if err != nil {
log.Fatal(err)
return err
}
m, err = migrate.NewWithInstance("iofs", d, "mysql", driver)
if err != nil {
log.Fatal(err)
return err
}
}
@@ -61,22 +60,22 @@ func checkDBVersion(backend string, db *sql.DB) {
if err == migrate.ErrNilVersion {
log.Warn("Legacy database without version or missing database file!")
} else {
log.Fatal(err)
return err
}
}
if v < Version {
log.Warnf("Unsupported database version %d, need %d.\nPlease backup your database file and run cc-backend --migrate-db", v, Version)
os.Exit(0)
return fmt.Errorf("Unsupported database version %d, need %d.\nPlease backup your database file and run cc-backend --migrate-db", v, Version)
}
if v > Version {
log.Warnf("Unsupported database version %d, need %d.\nPlease refer to documentation how to downgrade db with external migrate tool!", v, Version)
os.Exit(0)
return fmt.Errorf("Unsupported database version %d, need %d.\nPlease refer to documentation how to downgrade db with external migrate tool!", v, Version)
}
return nil
}
func MigrateDB(backend string, db string) {
func MigrateDB(backend string, db string) error {
var m *migrate.Migrate
if backend == "sqlite3" {
@@ -87,17 +86,17 @@ func MigrateDB(backend string, db string) {
m, err = migrate.NewWithSourceInstance("iofs", d, fmt.Sprintf("sqlite3://%s?_foreign_keys=on", db))
if err != nil {
log.Fatal(err)
return err
}
} else if backend == "mysql" {
d, err := iofs.New(migrationFiles, "migrations/mysql")
if err != nil {
log.Fatal(err)
return err
}
m, err = migrate.NewWithSourceInstance("iofs", d, fmt.Sprintf("mysql://%s?multiStatements=true", db))
if err != nil {
log.Fatal(err)
return err
}
}
@@ -105,9 +104,10 @@ func MigrateDB(backend string, db string) {
if err == migrate.ErrNoChange {
log.Info("DB already up to date!")
} else {
log.Fatal(err)
return err
}
}
m.Close()
return nil
}

View File

@@ -7,19 +7,19 @@ CREATE TABLE IF NOT EXISTS job (
user VARCHAR(255) NOT NULL,
project VARCHAR(255) NOT NULL,
partition VARCHAR(255) NOT NULL,
array_job_id BIGINT NOT NULL,
duration INT NOT NULL DEFAULT 0,
walltime INT NOT NULL DEFAULT 0,
job_state VARCHAR(255) NOT NULL
partition VARCHAR(255),
array_job_id BIGINT,
duration INT NOT NULL,
walltime INT NOT NULL,
job_state VARCHAR(255) NOT NULL
CHECK(job_state IN ('running', 'completed', 'failed', 'cancelled',
'stopped', 'timeout', 'preempted', 'out_of_memory')),
meta_data TEXT, -- JSON
resources TEXT NOT NULL, -- JSON
num_nodes INT NOT NULL,
num_hwthreads INT NOT NULL,
num_acc INT NOT NULL,
num_hwthreads INT,
num_acc INT,
smt TINYINT NOT NULL DEFAULT 1 CHECK(smt IN (0, 1 )),
exclusive TINYINT NOT NULL DEFAULT 1 CHECK(exclusive IN (0, 1, 2)),
monitoring_status TINYINT NOT NULL DEFAULT 1 CHECK(monitoring_status IN (0, 1, 2, 3)),

View File

@@ -11,12 +11,10 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/pkg/log"
_ "github.com/mattn/go-sqlite3"
)
func init() {
Connect("sqlite3", "../../test/test.db")
}
func setupUserTest(t *testing.T) *UserCfgRepo {
const testconfig = `{
"addr": "0.0.0.0:8080",
@@ -34,6 +32,15 @@ func setupUserTest(t *testing.T) *UserCfgRepo {
"startTime": { "from": "2022-01-01T00:00:00Z", "to": null }
} } ]
}`
log.Init("info", true)
dbfilepath := "../../test/test.db"
err := MigrateDB("sqlite3", dbfilepath)
if err != nil {
t.Fatal(err)
}
Connect("sqlite3", dbfilepath)
tmpdir := t.TempDir()
cfgFilePath := filepath.Join(tmpdir, "config.json")
if err := os.WriteFile(cfgFilePath, []byte(testconfig), 0666); err != nil {
@@ -43,9 +50,10 @@ func setupUserTest(t *testing.T) *UserCfgRepo {
config.Init(cfgFilePath)
return GetUserCfgRepo()
}
func TestGetUIConfig(t *testing.T) {
r := setupUserTest(t)
u := auth.User{Username: "jan"}
u := auth.User{Username: "demo"}
cfg, err := r.GetUIConfig(&u)
if err != nil {
@@ -53,10 +61,9 @@ func TestGetUIConfig(t *testing.T) {
}
tmp := cfg["plot_list_selectedMetrics"]
metrics := tmp.([]interface{})
str := metrics[2].(string)
if str != "mem_bw" {
metrics := tmp.([]string)
str := metrics[2]
if str != "mem_used" {
t.Errorf("wrong config\ngot: %s \nwant: mem_bw", str)
}
}