mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-12-31 02:46:16 +01:00
Merge branch 'dev' of github.com:ClusterCockpit/cc-backend into dev
This commit is contained in:
@@ -141,7 +141,7 @@ func setup(t *testing.T) *api.RestAPI {
|
||||
}
|
||||
|
||||
dbfilepath := filepath.Join(tmpdir, "test.db")
|
||||
err := repository.MigrateDB("sqlite3", dbfilepath)
|
||||
err := repository.MigrateDB(dbfilepath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -79,8 +79,11 @@ func (api *RestAPI) MountAPIRoutes(r *mux.Router) {
|
||||
// Slurm node state
|
||||
r.HandleFunc("/nodestate/", api.updateNodeStates).Methods(http.MethodPost, http.MethodPut)
|
||||
// Job Handler
|
||||
r.HandleFunc("/jobs/start_job/", api.startJob).Methods(http.MethodPost, http.MethodPut)
|
||||
r.HandleFunc("/jobs/stop_job/", api.stopJobByRequest).Methods(http.MethodPost, http.MethodPut)
|
||||
if config.Keys.APISubjects == nil {
|
||||
cclog.Info("Enabling REST start/stop job API")
|
||||
r.HandleFunc("/jobs/start_job/", api.startJob).Methods(http.MethodPost, http.MethodPut)
|
||||
r.HandleFunc("/jobs/stop_job/", api.stopJobByRequest).Methods(http.MethodPost, http.MethodPut)
|
||||
}
|
||||
r.HandleFunc("/jobs/", api.getJobs).Methods(http.MethodGet)
|
||||
r.HandleFunc("/jobs/{id}", api.getJobByID).Methods(http.MethodPost)
|
||||
r.HandleFunc("/jobs/{id}", api.getCompleteJobByID).Methods(http.MethodGet)
|
||||
|
||||
@@ -37,10 +37,10 @@ type ProgramConfig struct {
|
||||
EmbedStaticFiles bool `json:"embed-static-files"`
|
||||
StaticFiles string `json:"static-files"`
|
||||
|
||||
// 'sqlite3' or 'mysql' (mysql will work for mariadb as well)
|
||||
// Database driver - only 'sqlite3' is supported
|
||||
DBDriver string `json:"db-driver"`
|
||||
|
||||
// For sqlite3 a filename, for mysql a DSN in this format: https://github.com/go-sql-driver/mysql#dsn-data-source-name (Without query parameters!).
|
||||
// Path to SQLite database file
|
||||
DB string `json:"db"`
|
||||
|
||||
// Keep all metric data in the metric data repositories,
|
||||
|
||||
@@ -41,7 +41,7 @@ var configSchema = `
|
||||
"type": "string"
|
||||
},
|
||||
"db": {
|
||||
"description": "For sqlite3 a filename, for mysql a DSN in this format: https://github.com/go-sql-driver/mysql#dsn-data-source-name (Without query parameters!).",
|
||||
"description": "Path to SQLite database file (e.g., './var/job.db')",
|
||||
"type": "string"
|
||||
},
|
||||
"disable-archive": {
|
||||
|
||||
@@ -88,14 +88,14 @@ func (r *jobResolver) EnergyFootprint(ctx context.Context, obj *schema.Job) ([]*
|
||||
res := []*model.EnergyFootprintValue{}
|
||||
for name, value := range rawEnergyFootprint {
|
||||
// Suboptimal: Nearly hardcoded metric name expectations
|
||||
matchCpu := regexp.MustCompile(`cpu|Cpu|CPU`)
|
||||
matchCPU := regexp.MustCompile(`cpu|Cpu|CPU`)
|
||||
matchAcc := regexp.MustCompile(`acc|Acc|ACC`)
|
||||
matchMem := regexp.MustCompile(`mem|Mem|MEM`)
|
||||
matchCore := regexp.MustCompile(`core|Core|CORE`)
|
||||
|
||||
hwType := ""
|
||||
switch test := name; { // NOtice ';' for var declaration
|
||||
case matchCpu.MatchString(test):
|
||||
case matchCPU.MatchString(test):
|
||||
hwType = "CPU"
|
||||
case matchAcc.MatchString(test):
|
||||
hwType = "Accelerator"
|
||||
@@ -175,9 +175,9 @@ func (r *mutationResolver) AddTagsToJob(ctx context.Context, job string, tagIds
|
||||
}
|
||||
|
||||
tags := []*schema.Tag{}
|
||||
for _, tagId := range tagIds {
|
||||
for _, tagID := range tagIds {
|
||||
// Get ID
|
||||
tid, err := strconv.ParseInt(tagId, 10, 64)
|
||||
tid, err := strconv.ParseInt(tagID, 10, 64)
|
||||
if err != nil {
|
||||
cclog.Warn("Error while parsing tag id")
|
||||
return nil, err
|
||||
@@ -222,9 +222,9 @@ func (r *mutationResolver) RemoveTagsFromJob(ctx context.Context, job string, ta
|
||||
}
|
||||
|
||||
tags := []*schema.Tag{}
|
||||
for _, tagId := range tagIds {
|
||||
for _, tagID := range tagIds {
|
||||
// Get ID
|
||||
tid, err := strconv.ParseInt(tagId, 10, 64)
|
||||
tid, err := strconv.ParseInt(tagID, 10, 64)
|
||||
if err != nil {
|
||||
cclog.Warn("Error while parsing tag id")
|
||||
return nil, err
|
||||
@@ -265,9 +265,9 @@ func (r *mutationResolver) RemoveTagFromList(ctx context.Context, tagIds []strin
|
||||
}
|
||||
|
||||
tags := []int{}
|
||||
for _, tagId := range tagIds {
|
||||
for _, tagID := range tagIds {
|
||||
// Get ID
|
||||
tid, err := strconv.ParseInt(tagId, 10, 64)
|
||||
tid, err := strconv.ParseInt(tagID, 10, 64)
|
||||
if err != nil {
|
||||
cclog.Warn("Error while parsing tag id for removal")
|
||||
return nil, err
|
||||
@@ -317,7 +317,7 @@ func (r *nodeResolver) SchedulerState(ctx context.Context, obj *schema.Node) (sc
|
||||
if obj.NodeState != "" {
|
||||
return obj.NodeState, nil
|
||||
} else {
|
||||
return "", fmt.Errorf("No SchedulerState (NodeState) on Object")
|
||||
return "", fmt.Errorf("no SchedulerState (NodeState) on Object")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -343,6 +343,14 @@ func (r *queryResolver) Tags(ctx context.Context) ([]*schema.Tag, error) {
|
||||
|
||||
// GlobalMetrics is the resolver for the globalMetrics field.
|
||||
func (r *queryResolver) GlobalMetrics(ctx context.Context) ([]*schema.GlobalMetricListItem, error) {
|
||||
user := repository.GetUserFromContext(ctx)
|
||||
|
||||
if user != nil {
|
||||
if user.HasRole(schema.RoleUser) || user.HasRole(schema.RoleManager) {
|
||||
return archive.GlobalUserMetricList, nil
|
||||
}
|
||||
}
|
||||
|
||||
return archive.GlobalMetricList, nil
|
||||
}
|
||||
|
||||
@@ -373,12 +381,12 @@ func (r *queryResolver) AllocatedNodes(ctx context.Context, cluster string) ([]*
|
||||
// Node is the resolver for the node field.
|
||||
func (r *queryResolver) Node(ctx context.Context, id string) (*schema.Node, error) {
|
||||
repo := repository.GetNodeRepository()
|
||||
numericId, err := strconv.ParseInt(id, 10, 64)
|
||||
numericID, err := strconv.ParseInt(id, 10, 64)
|
||||
if err != nil {
|
||||
cclog.Warn("Error while parsing job id")
|
||||
return nil, err
|
||||
}
|
||||
return repo.GetNodeByID(numericId, false)
|
||||
return repo.GetNodeByID(numericID, false)
|
||||
}
|
||||
|
||||
// Nodes is the resolver for the nodes field.
|
||||
@@ -405,8 +413,7 @@ func (r *queryResolver) NodeStates(ctx context.Context, filter []*model.NodeFilt
|
||||
return nil, herr
|
||||
}
|
||||
|
||||
allCounts := make([]*model.NodeStates, 0)
|
||||
allCounts = append(stateCounts, healthCounts...)
|
||||
allCounts := append(stateCounts, healthCounts...)
|
||||
|
||||
return allCounts, nil
|
||||
}
|
||||
@@ -433,18 +440,18 @@ func (r *queryResolver) NodeStatesTimed(ctx context.Context, filter []*model.Nod
|
||||
return healthCounts, nil
|
||||
}
|
||||
|
||||
return nil, errors.New("Unknown Node State Query Type")
|
||||
return nil, errors.New("unknown Node State Query Type")
|
||||
}
|
||||
|
||||
// Job is the resolver for the job field.
|
||||
func (r *queryResolver) Job(ctx context.Context, id string) (*schema.Job, error) {
|
||||
numericId, err := strconv.ParseInt(id, 10, 64)
|
||||
numericID, err := strconv.ParseInt(id, 10, 64)
|
||||
if err != nil {
|
||||
cclog.Warn("Error while parsing job id")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
job, err := r.Repo.FindByID(ctx, numericId)
|
||||
job, err := r.Repo.FindByID(ctx, numericID)
|
||||
if err != nil {
|
||||
cclog.Warn("Error while finding job by id")
|
||||
return nil, err
|
||||
@@ -809,7 +816,7 @@ func (r *queryResolver) NodeMetricsList(ctx context.Context, cluster string, sub
|
||||
nodeRepo := repository.GetNodeRepository()
|
||||
nodes, stateMap, countNodes, hasNextPage, nerr := nodeRepo.GetNodesForList(ctx, cluster, subCluster, stateFilter, nodeFilter, page)
|
||||
if nerr != nil {
|
||||
return nil, errors.New("Could not retrieve node list required for resolving NodeMetricsList")
|
||||
return nil, errors.New("could not retrieve node list required for resolving NodeMetricsList")
|
||||
}
|
||||
|
||||
if metrics == nil {
|
||||
@@ -898,9 +905,7 @@ func (r *queryResolver) ClusterMetrics(ctx context.Context, cluster string, metr
|
||||
collectorUnit[metric] = scopedMetric.Unit
|
||||
// Collect Initial Data
|
||||
for _, ser := range scopedMetric.Series {
|
||||
for _, val := range ser.Data {
|
||||
collectorData[metric] = append(collectorData[metric], val)
|
||||
}
|
||||
collectorData[metric] = append(collectorData[metric], ser.Data...)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -107,7 +107,7 @@ func setup(t *testing.T) *repository.JobRepository {
|
||||
}
|
||||
|
||||
dbfilepath := filepath.Join(tmpdir, "test.db")
|
||||
err := repository.MigrateDB("sqlite3", dbfilepath)
|
||||
err := repository.MigrateDB(dbfilepath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -770,21 +770,25 @@ func (ccms *CCMetricStore) LoadNodeData(
|
||||
}
|
||||
|
||||
mc := archive.GetMetricConfig(cluster, metric)
|
||||
hostdata[metric] = append(hostdata[metric], &schema.JobMetric{
|
||||
Unit: mc.Unit,
|
||||
Timestep: mc.Timestep,
|
||||
Series: []schema.Series{
|
||||
{
|
||||
Hostname: query.Hostname,
|
||||
Data: qdata.Data,
|
||||
Statistics: schema.MetricStatistics{
|
||||
Avg: float64(qdata.Avg),
|
||||
Min: float64(qdata.Min),
|
||||
Max: float64(qdata.Max),
|
||||
if mc != nil {
|
||||
hostdata[metric] = append(hostdata[metric], &schema.JobMetric{
|
||||
Unit: mc.Unit,
|
||||
Timestep: mc.Timestep,
|
||||
Series: []schema.Series{
|
||||
{
|
||||
Hostname: query.Hostname,
|
||||
Data: qdata.Data,
|
||||
Statistics: schema.MetricStatistics{
|
||||
Avg: float64(qdata.Avg),
|
||||
Min: float64(qdata.Min),
|
||||
Max: float64(qdata.Max),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
})
|
||||
} else {
|
||||
cclog.Warnf("Metric '%s' not configured for cluster '%s': Skipped in LoadNodeData() Return!", metric, cluster)
|
||||
}
|
||||
}
|
||||
|
||||
if len(errors) != 0 {
|
||||
|
||||
@@ -55,6 +55,10 @@ func Connect(driver string, db string) {
|
||||
var err error
|
||||
var dbHandle *sqlx.DB
|
||||
|
||||
if driver != "sqlite3" {
|
||||
cclog.Abortf("Unsupported database driver '%s'. Only 'sqlite3' is supported.\n", driver)
|
||||
}
|
||||
|
||||
dbConnOnce.Do(func() {
|
||||
opts := DatabaseOptions{
|
||||
URL: db,
|
||||
@@ -64,39 +68,31 @@ func Connect(driver string, db string) {
|
||||
ConnectionMaxIdleTime: repoConfig.ConnectionMaxIdleTime,
|
||||
}
|
||||
|
||||
switch driver {
|
||||
case "sqlite3":
|
||||
// TODO: Have separate DB handles for Writes and Reads
|
||||
// Optimize SQLite connection: https://kerkour.com/sqlite-for-servers
|
||||
connectionURLParams := make(url.Values)
|
||||
connectionURLParams.Add("_txlock", "immediate")
|
||||
connectionURLParams.Add("_journal_mode", "WAL")
|
||||
connectionURLParams.Add("_busy_timeout", "5000")
|
||||
connectionURLParams.Add("_synchronous", "NORMAL")
|
||||
connectionURLParams.Add("_cache_size", "1000000000")
|
||||
connectionURLParams.Add("_foreign_keys", "true")
|
||||
opts.URL = fmt.Sprintf("file:%s?%s", opts.URL, connectionURLParams.Encode())
|
||||
// TODO: Have separate DB handles for Writes and Reads
|
||||
// Optimize SQLite connection: https://kerkour.com/sqlite-for-servers
|
||||
connectionURLParams := make(url.Values)
|
||||
connectionURLParams.Add("_txlock", "immediate")
|
||||
connectionURLParams.Add("_journal_mode", "WAL")
|
||||
connectionURLParams.Add("_busy_timeout", "5000")
|
||||
connectionURLParams.Add("_synchronous", "NORMAL")
|
||||
connectionURLParams.Add("_cache_size", "1000000000")
|
||||
connectionURLParams.Add("_foreign_keys", "true")
|
||||
opts.URL = fmt.Sprintf("file:%s?%s", opts.URL, connectionURLParams.Encode())
|
||||
|
||||
if cclog.Loglevel() == "debug" {
|
||||
sql.Register("sqlite3WithHooks", sqlhooks.Wrap(&sqlite3.SQLiteDriver{}, &Hooks{}))
|
||||
dbHandle, err = sqlx.Open("sqlite3WithHooks", opts.URL)
|
||||
} else {
|
||||
dbHandle, err = sqlx.Open("sqlite3", opts.URL)
|
||||
}
|
||||
|
||||
err = setupSqlite(dbHandle.DB)
|
||||
if err != nil {
|
||||
cclog.Abortf("Failed sqlite db setup.\nError: %s\n", err.Error())
|
||||
}
|
||||
case "mysql":
|
||||
opts.URL += "?multiStatements=true"
|
||||
dbHandle, err = sqlx.Open("mysql", opts.URL)
|
||||
default:
|
||||
cclog.Abortf("DB Connection: Unsupported database driver '%s'.\n", driver)
|
||||
if cclog.Loglevel() == "debug" {
|
||||
sql.Register("sqlite3WithHooks", sqlhooks.Wrap(&sqlite3.SQLiteDriver{}, &Hooks{}))
|
||||
dbHandle, err = sqlx.Open("sqlite3WithHooks", opts.URL)
|
||||
} else {
|
||||
dbHandle, err = sqlx.Open("sqlite3", opts.URL)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
cclog.Abortf("DB Connection: Could not connect to '%s' database with sqlx.Open().\nError: %s\n", driver, err.Error())
|
||||
cclog.Abortf("DB Connection: Could not connect to SQLite database with sqlx.Open().\nError: %s\n", err.Error())
|
||||
}
|
||||
|
||||
err = setupSqlite(dbHandle.DB)
|
||||
if err != nil {
|
||||
cclog.Abortf("Failed sqlite db setup.\nError: %s\n", err.Error())
|
||||
}
|
||||
|
||||
dbHandle.SetMaxOpenConns(opts.MaxOpenConnections)
|
||||
@@ -105,7 +101,7 @@ func Connect(driver string, db string) {
|
||||
dbHandle.SetConnMaxIdleTime(opts.ConnectionMaxIdleTime)
|
||||
|
||||
dbConnInstance = &DBConnection{DB: dbHandle, Driver: driver}
|
||||
err = checkDBVersion(driver, dbHandle.DB)
|
||||
err = checkDBVersion(dbHandle.DB)
|
||||
if err != nil {
|
||||
cclog.Abortf("DB Connection: Failed DB version check.\nError: %s\n", err.Error())
|
||||
}
|
||||
|
||||
@@ -14,8 +14,6 @@
|
||||
// Initialize the database connection before using any repository:
|
||||
//
|
||||
// repository.Connect("sqlite3", "./var/job.db")
|
||||
// // or for MySQL:
|
||||
// repository.Connect("mysql", "user:password@tcp(localhost:3306)/dbname")
|
||||
//
|
||||
// # Configuration
|
||||
//
|
||||
@@ -158,52 +156,22 @@ func scanJob(row interface{ Scan(...any) error }) (*schema.Job, error) {
|
||||
}
|
||||
|
||||
func (r *JobRepository) Optimize() error {
|
||||
var err error
|
||||
|
||||
switch r.driver {
|
||||
case "sqlite3":
|
||||
if _, err = r.DB.Exec(`VACUUM`); err != nil {
|
||||
return err
|
||||
}
|
||||
case "mysql":
|
||||
cclog.Info("Optimize currently not supported for mysql driver")
|
||||
if _, err := r.DB.Exec(`VACUUM`); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *JobRepository) Flush() error {
|
||||
var err error
|
||||
|
||||
switch r.driver {
|
||||
case "sqlite3":
|
||||
if _, err = r.DB.Exec(`DELETE FROM jobtag`); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = r.DB.Exec(`DELETE FROM tag`); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = r.DB.Exec(`DELETE FROM job`); err != nil {
|
||||
return err
|
||||
}
|
||||
case "mysql":
|
||||
if _, err = r.DB.Exec(`SET FOREIGN_KEY_CHECKS = 0`); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = r.DB.Exec(`TRUNCATE TABLE jobtag`); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = r.DB.Exec(`TRUNCATE TABLE tag`); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = r.DB.Exec(`TRUNCATE TABLE job`); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = r.DB.Exec(`SET FOREIGN_KEY_CHECKS = 1`); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := r.DB.Exec(`DELETE FROM jobtag`); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := r.DB.Exec(`DELETE FROM tag`); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := r.DB.Exec(`DELETE FROM job`); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,6 @@ import (
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
"github.com/golang-migrate/migrate/v4"
|
||||
"github.com/golang-migrate/migrate/v4/database/mysql"
|
||||
"github.com/golang-migrate/migrate/v4/database/sqlite3"
|
||||
"github.com/golang-migrate/migrate/v4/source/iofs"
|
||||
)
|
||||
@@ -22,40 +21,19 @@ const Version uint = 10
|
||||
//go:embed migrations/*
|
||||
var migrationFiles embed.FS
|
||||
|
||||
func checkDBVersion(backend string, db *sql.DB) error {
|
||||
var m *migrate.Migrate
|
||||
func checkDBVersion(db *sql.DB) error {
|
||||
driver, err := sqlite3.WithInstance(db, &sqlite3.Config{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d, err := iofs.New(migrationFiles, "migrations/sqlite3")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch backend {
|
||||
case "sqlite3":
|
||||
driver, err := sqlite3.WithInstance(db, &sqlite3.Config{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d, err := iofs.New(migrationFiles, "migrations/sqlite3")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m, err = migrate.NewWithInstance("iofs", d, "sqlite3", driver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case "mysql":
|
||||
driver, err := mysql.WithInstance(db, &mysql.Config{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d, err := iofs.New(migrationFiles, "migrations/mysql")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m, err = migrate.NewWithInstance("iofs", d, "mysql", driver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
cclog.Abortf("Migration: Unsupported database backend '%s'.\n", backend)
|
||||
m, err := migrate.NewWithInstance("iofs", d, "sqlite3", driver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
v, dirty, err := m.Version()
|
||||
@@ -80,37 +58,22 @@ func checkDBVersion(backend string, db *sql.DB) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func getMigrateInstance(backend string, db string) (m *migrate.Migrate, err error) {
|
||||
switch backend {
|
||||
case "sqlite3":
|
||||
d, err := iofs.New(migrationFiles, "migrations/sqlite3")
|
||||
if err != nil {
|
||||
cclog.Fatal(err)
|
||||
}
|
||||
func getMigrateInstance(db string) (m *migrate.Migrate, err error) {
|
||||
d, err := iofs.New(migrationFiles, "migrations/sqlite3")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m, err = migrate.NewWithSourceInstance("iofs", d, fmt.Sprintf("sqlite3://%s?_foreign_keys=on", db))
|
||||
if err != nil {
|
||||
return m, err
|
||||
}
|
||||
case "mysql":
|
||||
d, err := iofs.New(migrationFiles, "migrations/mysql")
|
||||
if err != nil {
|
||||
return m, err
|
||||
}
|
||||
|
||||
m, err = migrate.NewWithSourceInstance("iofs", d, fmt.Sprintf("mysql://%s?multiStatements=true", db))
|
||||
if err != nil {
|
||||
return m, err
|
||||
}
|
||||
default:
|
||||
cclog.Abortf("Migration: Unsupported database backend '%s'.\n", backend)
|
||||
m, err = migrate.NewWithSourceInstance("iofs", d, fmt.Sprintf("sqlite3://%s?_foreign_keys=on", db))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func MigrateDB(backend string, db string) error {
|
||||
m, err := getMigrateInstance(backend, db)
|
||||
func MigrateDB(db string) error {
|
||||
m, err := getMigrateInstance(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -144,8 +107,8 @@ func MigrateDB(backend string, db string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func RevertDB(backend string, db string) error {
|
||||
m, err := getMigrateInstance(backend, db)
|
||||
func RevertDB(db string) error {
|
||||
m, err := getMigrateInstance(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -162,8 +125,8 @@ func RevertDB(backend string, db string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func ForceDB(backend string, db string) error {
|
||||
m, err := getMigrateInstance(backend, db)
|
||||
func ForceDB(db string) error {
|
||||
m, err := getMigrateInstance(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
DROP TABLE IF EXISTS job;
|
||||
DROP TABLE IF EXISTS tags;
|
||||
DROP TABLE IF EXISTS jobtag;
|
||||
DROP TABLE IF EXISTS configuration;
|
||||
DROP TABLE IF EXISTS user;
|
||||
@@ -1,66 +0,0 @@
|
||||
CREATE TABLE IF NOT EXISTS job (
|
||||
id INTEGER AUTO_INCREMENT PRIMARY KEY ,
|
||||
job_id BIGINT NOT NULL,
|
||||
cluster VARCHAR(255) NOT NULL,
|
||||
subcluster VARCHAR(255) NOT NULL,
|
||||
start_time BIGINT NOT NULL, -- Unix timestamp
|
||||
|
||||
user VARCHAR(255) NOT NULL,
|
||||
project VARCHAR(255) NOT NULL,
|
||||
`partition` VARCHAR(255) NOT NULL,
|
||||
array_job_id BIGINT NOT NULL,
|
||||
duration INT NOT NULL DEFAULT 0,
|
||||
walltime INT NOT NULL DEFAULT 0,
|
||||
job_state VARCHAR(255) NOT NULL
|
||||
CHECK(job_state IN ('running', 'completed', 'failed', 'cancelled',
|
||||
'stopped', 'timeout', 'preempted', 'out_of_memory')),
|
||||
meta_data TEXT, -- JSON
|
||||
resources TEXT NOT NULL, -- JSON
|
||||
|
||||
num_nodes INT NOT NULL,
|
||||
num_hwthreads INT NOT NULL,
|
||||
num_acc INT NOT NULL,
|
||||
smt TINYINT NOT NULL DEFAULT 1 CHECK(smt IN (0, 1 )),
|
||||
exclusive TINYINT NOT NULL DEFAULT 1 CHECK(exclusive IN (0, 1, 2)),
|
||||
monitoring_status TINYINT NOT NULL DEFAULT 1 CHECK(monitoring_status IN (0, 1, 2, 3)),
|
||||
|
||||
mem_used_max REAL NOT NULL DEFAULT 0.0,
|
||||
flops_any_avg REAL NOT NULL DEFAULT 0.0,
|
||||
mem_bw_avg REAL NOT NULL DEFAULT 0.0,
|
||||
load_avg REAL NOT NULL DEFAULT 0.0,
|
||||
net_bw_avg REAL NOT NULL DEFAULT 0.0,
|
||||
net_data_vol_total REAL NOT NULL DEFAULT 0.0,
|
||||
file_bw_avg REAL NOT NULL DEFAULT 0.0,
|
||||
file_data_vol_total REAL NOT NULL DEFAULT 0.0,
|
||||
UNIQUE (job_id, cluster, start_time)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tag (
|
||||
id INTEGER PRIMARY KEY,
|
||||
tag_type VARCHAR(255) NOT NULL,
|
||||
tag_name VARCHAR(255) NOT NULL,
|
||||
UNIQUE (tag_type, tag_name));
|
||||
|
||||
CREATE TABLE IF NOT EXISTS jobtag (
|
||||
job_id INTEGER,
|
||||
tag_id INTEGER,
|
||||
PRIMARY KEY (job_id, tag_id),
|
||||
FOREIGN KEY (job_id) REFERENCES job (id) ON DELETE CASCADE,
|
||||
FOREIGN KEY (tag_id) REFERENCES tag (id) ON DELETE CASCADE);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS user (
|
||||
username varchar(255) PRIMARY KEY NOT NULL,
|
||||
password varchar(255) DEFAULT NULL,
|
||||
ldap tinyint NOT NULL DEFAULT 0, /* col called "ldap" for historic reasons, fills the "AuthSource" */
|
||||
name varchar(255) DEFAULT NULL,
|
||||
roles varchar(255) NOT NULL DEFAULT "[]",
|
||||
email varchar(255) DEFAULT NULL);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS configuration (
|
||||
username varchar(255),
|
||||
confkey varchar(255),
|
||||
value varchar(255),
|
||||
PRIMARY KEY (username, confkey),
|
||||
FOREIGN KEY (username) REFERENCES user (username) ON DELETE CASCADE ON UPDATE NO ACTION);
|
||||
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
DROP INDEX IF EXISTS job_stats;
|
||||
DROP INDEX IF EXISTS job_by_user;
|
||||
DROP INDEX IF EXISTS job_by_starttime;
|
||||
DROP INDEX IF EXISTS job_by_job_id;
|
||||
DROP INDEX IF EXISTS job_list;
|
||||
DROP INDEX IF EXISTS job_list_user;
|
||||
DROP INDEX IF EXISTS job_list_users;
|
||||
DROP INDEX IF EXISTS job_list_users_start;
|
||||
@@ -1,8 +0,0 @@
|
||||
CREATE INDEX IF NOT EXISTS job_stats ON job (cluster,subcluster,user);
|
||||
CREATE INDEX IF NOT EXISTS job_by_user ON job (user);
|
||||
CREATE INDEX IF NOT EXISTS job_by_starttime ON job (start_time);
|
||||
CREATE INDEX IF NOT EXISTS job_by_job_id ON job (job_id);
|
||||
CREATE INDEX IF NOT EXISTS job_list ON job (cluster, job_state);
|
||||
CREATE INDEX IF NOT EXISTS job_list_user ON job (user, cluster, job_state);
|
||||
CREATE INDEX IF NOT EXISTS job_list_users ON job (user, job_state);
|
||||
CREATE INDEX IF NOT EXISTS job_list_users_start ON job (start_time, user, job_state);
|
||||
@@ -1 +0,0 @@
|
||||
ALTER TABLE user DROP COLUMN projects;
|
||||
@@ -1 +0,0 @@
|
||||
ALTER TABLE user ADD COLUMN projects varchar(255) NOT NULL DEFAULT "[]";
|
||||
@@ -1,5 +0,0 @@
|
||||
ALTER TABLE job
|
||||
MODIFY `partition` VARCHAR(255) NOT NULL,
|
||||
MODIFY array_job_id BIGINT NOT NULL,
|
||||
MODIFY num_hwthreads INT NOT NULL,
|
||||
MODIFY num_acc INT NOT NULL;
|
||||
@@ -1,5 +0,0 @@
|
||||
ALTER TABLE job
|
||||
MODIFY `partition` VARCHAR(255),
|
||||
MODIFY array_job_id BIGINT,
|
||||
MODIFY num_hwthreads INT,
|
||||
MODIFY num_acc INT;
|
||||
@@ -1,2 +0,0 @@
|
||||
ALTER TABLE tag DROP COLUMN insert_time;
|
||||
ALTER TABLE jobtag DROP COLUMN insert_time;
|
||||
@@ -1,2 +0,0 @@
|
||||
ALTER TABLE tag ADD COLUMN insert_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP;
|
||||
ALTER TABLE jobtag ADD COLUMN insert_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP;
|
||||
@@ -1 +0,0 @@
|
||||
ALTER TABLE configuration MODIFY value VARCHAR(255);
|
||||
@@ -1 +0,0 @@
|
||||
ALTER TABLE configuration MODIFY value TEXT;
|
||||
@@ -1,3 +0,0 @@
|
||||
SET FOREIGN_KEY_CHECKS = 0;
|
||||
ALTER TABLE tag MODIFY id INTEGER;
|
||||
SET FOREIGN_KEY_CHECKS = 1;
|
||||
@@ -1,3 +0,0 @@
|
||||
SET FOREIGN_KEY_CHECKS = 0;
|
||||
ALTER TABLE tag MODIFY id INTEGER AUTO_INCREMENT;
|
||||
SET FOREIGN_KEY_CHECKS = 1;
|
||||
@@ -1,83 +0,0 @@
|
||||
ALTER TABLE job DROP energy;
|
||||
ALTER TABLE job DROP energy_footprint;
|
||||
ALTER TABLE job ADD COLUMN flops_any_avg;
|
||||
ALTER TABLE job ADD COLUMN mem_bw_avg;
|
||||
ALTER TABLE job ADD COLUMN mem_used_max;
|
||||
ALTER TABLE job ADD COLUMN load_avg;
|
||||
ALTER TABLE job ADD COLUMN net_bw_avg;
|
||||
ALTER TABLE job ADD COLUMN net_data_vol_total;
|
||||
ALTER TABLE job ADD COLUMN file_bw_avg;
|
||||
ALTER TABLE job ADD COLUMN file_data_vol_total;
|
||||
|
||||
UPDATE job SET flops_any_avg = json_extract(footprint, '$.flops_any_avg');
|
||||
UPDATE job SET mem_bw_avg = json_extract(footprint, '$.mem_bw_avg');
|
||||
UPDATE job SET mem_used_max = json_extract(footprint, '$.mem_used_max');
|
||||
UPDATE job SET load_avg = json_extract(footprint, '$.cpu_load_avg');
|
||||
UPDATE job SET net_bw_avg = json_extract(footprint, '$.net_bw_avg');
|
||||
UPDATE job SET net_data_vol_total = json_extract(footprint, '$.net_data_vol_total');
|
||||
UPDATE job SET file_bw_avg = json_extract(footprint, '$.file_bw_avg');
|
||||
UPDATE job SET file_data_vol_total = json_extract(footprint, '$.file_data_vol_total');
|
||||
|
||||
ALTER TABLE job DROP footprint;
|
||||
-- Do not use reserved keywords anymore
|
||||
RENAME TABLE hpc_user TO `user`;
|
||||
ALTER TABLE job RENAME COLUMN hpc_user TO `user`;
|
||||
ALTER TABLE job RENAME COLUMN cluster_partition TO `partition`;
|
||||
|
||||
DROP INDEX IF EXISTS jobs_cluster;
|
||||
DROP INDEX IF EXISTS jobs_cluster_user;
|
||||
DROP INDEX IF EXISTS jobs_cluster_project;
|
||||
DROP INDEX IF EXISTS jobs_cluster_subcluster;
|
||||
DROP INDEX IF EXISTS jobs_cluster_starttime;
|
||||
DROP INDEX IF EXISTS jobs_cluster_duration;
|
||||
DROP INDEX IF EXISTS jobs_cluster_numnodes;
|
||||
|
||||
DROP INDEX IF EXISTS jobs_cluster_partition;
|
||||
DROP INDEX IF EXISTS jobs_cluster_partition_starttime;
|
||||
DROP INDEX IF EXISTS jobs_cluster_partition_duration;
|
||||
DROP INDEX IF EXISTS jobs_cluster_partition_numnodes;
|
||||
|
||||
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate;
|
||||
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_user;
|
||||
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_project;
|
||||
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_starttime;
|
||||
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_duration;
|
||||
DROP INDEX IF EXISTS jobs_cluster_partition_jobstate_numnodes;
|
||||
|
||||
DROP INDEX IF EXISTS jobs_cluster_jobstate;
|
||||
DROP INDEX IF EXISTS jobs_cluster_jobstate_user;
|
||||
DROP INDEX IF EXISTS jobs_cluster_jobstate_project;
|
||||
|
||||
DROP INDEX IF EXISTS jobs_cluster_jobstate_starttime;
|
||||
DROP INDEX IF EXISTS jobs_cluster_jobstate_duration;
|
||||
DROP INDEX IF EXISTS jobs_cluster_jobstate_numnodes;
|
||||
|
||||
DROP INDEX IF EXISTS jobs_user;
|
||||
DROP INDEX IF EXISTS jobs_user_starttime;
|
||||
DROP INDEX IF EXISTS jobs_user_duration;
|
||||
DROP INDEX IF EXISTS jobs_user_numnodes;
|
||||
|
||||
DROP INDEX IF EXISTS jobs_project;
|
||||
DROP INDEX IF EXISTS jobs_project_user;
|
||||
DROP INDEX IF EXISTS jobs_project_starttime;
|
||||
DROP INDEX IF EXISTS jobs_project_duration;
|
||||
DROP INDEX IF EXISTS jobs_project_numnodes;
|
||||
|
||||
DROP INDEX IF EXISTS jobs_jobstate;
|
||||
DROP INDEX IF EXISTS jobs_jobstate_user;
|
||||
DROP INDEX IF EXISTS jobs_jobstate_project;
|
||||
DROP INDEX IF EXISTS jobs_jobstate_starttime;
|
||||
DROP INDEX IF EXISTS jobs_jobstate_duration;
|
||||
DROP INDEX IF EXISTS jobs_jobstate_numnodes;
|
||||
|
||||
DROP INDEX IF EXISTS jobs_arrayjobid_starttime;
|
||||
DROP INDEX IF EXISTS jobs_cluster_arrayjobid_starttime;
|
||||
|
||||
DROP INDEX IF EXISTS jobs_starttime;
|
||||
DROP INDEX IF EXISTS jobs_duration;
|
||||
DROP INDEX IF EXISTS jobs_numnodes;
|
||||
|
||||
DROP INDEX IF EXISTS jobs_duration_starttime;
|
||||
DROP INDEX IF EXISTS jobs_numnodes_starttime;
|
||||
DROP INDEX IF EXISTS jobs_numacc_starttime;
|
||||
DROP INDEX IF EXISTS jobs_energy_starttime;
|
||||
@@ -1,123 +0,0 @@
|
||||
DROP INDEX IF EXISTS job_stats ON job;
|
||||
DROP INDEX IF EXISTS job_by_user ON job;
|
||||
DROP INDEX IF EXISTS job_by_starttime ON job;
|
||||
DROP INDEX IF EXISTS job_by_job_id ON job;
|
||||
DROP INDEX IF EXISTS job_list ON job;
|
||||
DROP INDEX IF EXISTS job_list_user ON job;
|
||||
DROP INDEX IF EXISTS job_list_users ON job;
|
||||
DROP INDEX IF EXISTS job_list_users_start ON job;
|
||||
|
||||
ALTER TABLE job ADD COLUMN energy REAL NOT NULL DEFAULT 0.0;
|
||||
ALTER TABLE job ADD COLUMN energy_footprint JSON;
|
||||
|
||||
ALTER TABLE job ADD COLUMN footprint JSON;
|
||||
ALTER TABLE tag ADD COLUMN tag_scope TEXT NOT NULL DEFAULT 'global';
|
||||
|
||||
-- Do not use reserved keywords anymore
|
||||
RENAME TABLE `user` TO hpc_user;
|
||||
ALTER TABLE job RENAME COLUMN `user` TO hpc_user;
|
||||
ALTER TABLE job RENAME COLUMN `partition` TO cluster_partition;
|
||||
|
||||
ALTER TABLE job MODIFY COLUMN cluster VARCHAR(50);
|
||||
ALTER TABLE job MODIFY COLUMN hpc_user VARCHAR(50);
|
||||
ALTER TABLE job MODIFY COLUMN subcluster VARCHAR(50);
|
||||
ALTER TABLE job MODIFY COLUMN project VARCHAR(50);
|
||||
ALTER TABLE job MODIFY COLUMN cluster_partition VARCHAR(50);
|
||||
ALTER TABLE job MODIFY COLUMN job_state VARCHAR(25);
|
||||
|
||||
UPDATE job SET footprint = '{"flops_any_avg": 0.0}';
|
||||
UPDATE job SET footprint = json_replace(footprint, '$.flops_any_avg', job.flops_any_avg);
|
||||
UPDATE job SET footprint = json_insert(footprint, '$.mem_bw_avg', job.mem_bw_avg);
|
||||
UPDATE job SET footprint = json_insert(footprint, '$.mem_used_max', job.mem_used_max);
|
||||
UPDATE job SET footprint = json_insert(footprint, '$.cpu_load_avg', job.load_avg);
|
||||
UPDATE job SET footprint = json_insert(footprint, '$.net_bw_avg', job.net_bw_avg) WHERE job.net_bw_avg != 0;
|
||||
UPDATE job SET footprint = json_insert(footprint, '$.net_data_vol_total', job.net_data_vol_total) WHERE job.net_data_vol_total != 0;
|
||||
UPDATE job SET footprint = json_insert(footprint, '$.file_bw_avg', job.file_bw_avg) WHERE job.file_bw_avg != 0;
|
||||
UPDATE job SET footprint = json_insert(footprint, '$.file_data_vol_total', job.file_data_vol_total) WHERE job.file_data_vol_total != 0;
|
||||
|
||||
ALTER TABLE job DROP flops_any_avg;
|
||||
ALTER TABLE job DROP mem_bw_avg;
|
||||
ALTER TABLE job DROP mem_used_max;
|
||||
ALTER TABLE job DROP load_avg;
|
||||
ALTER TABLE job DROP net_bw_avg;
|
||||
ALTER TABLE job DROP net_data_vol_total;
|
||||
ALTER TABLE job DROP file_bw_avg;
|
||||
ALTER TABLE job DROP file_data_vol_total;
|
||||
|
||||
-- Indices for: Single filters, combined filters, sorting, sorting with filters
|
||||
-- Cluster Filter
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster ON job (cluster);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_user ON job (cluster, hpc_user);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_project ON job (cluster, project);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_subcluster ON job (cluster, subcluster);
|
||||
-- Cluster Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_starttime ON job (cluster, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_duration ON job (cluster, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_numnodes ON job (cluster, num_nodes);
|
||||
|
||||
-- Cluster+Partition Filter
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition ON job (cluster, cluster_partition);
|
||||
-- Cluster+Partition Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_starttime ON job (cluster, cluster_partition, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_duration ON job (cluster, cluster_partition, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_numnodes ON job (cluster, cluster_partition, num_nodes);
|
||||
|
||||
-- Cluster+Partition+Jobstate Filter
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate ON job (cluster, cluster_partition, job_state);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_user ON job (cluster, cluster_partition, job_state, hpc_user);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_project ON job (cluster, cluster_partition, job_state, project);
|
||||
-- Cluster+Partition+Jobstate Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_starttime ON job (cluster, cluster_partition, job_state, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_duration ON job (cluster, cluster_partition, job_state, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_numnodes ON job (cluster, cluster_partition, job_state, num_nodes);
|
||||
|
||||
-- Cluster+JobState Filter
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate ON job (cluster, job_state);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_user ON job (cluster, job_state, hpc_user);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_project ON job (cluster, job_state, project);
|
||||
-- Cluster+JobState Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_starttime ON job (cluster, job_state, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_duration ON job (cluster, job_state, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_numnodes ON job (cluster, job_state, num_nodes);
|
||||
|
||||
-- User Filter
|
||||
CREATE INDEX IF NOT EXISTS jobs_user ON job (hpc_user);
|
||||
-- User Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_user_starttime ON job (hpc_user, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_user_duration ON job (hpc_user, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_user_numnodes ON job (hpc_user, num_nodes);
|
||||
|
||||
-- Project Filter
|
||||
CREATE INDEX IF NOT EXISTS jobs_project ON job (project);
|
||||
CREATE INDEX IF NOT EXISTS jobs_project_user ON job (project, hpc_user);
|
||||
-- Project Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_project_starttime ON job (project, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_project_duration ON job (project, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_project_numnodes ON job (project, num_nodes);
|
||||
|
||||
-- JobState Filter
|
||||
CREATE INDEX IF NOT EXISTS jobs_jobstate ON job (job_state);
|
||||
CREATE INDEX IF NOT EXISTS jobs_jobstate_user ON job (job_state, hpc_user);
|
||||
CREATE INDEX IF NOT EXISTS jobs_jobstate_project ON job (job_state, project);
|
||||
CREATE INDEX IF NOT EXISTS jobs_jobstate_cluster ON job (job_state, cluster);
|
||||
-- JobState Filter Sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_jobstate_starttime ON job (job_state, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_jobstate_duration ON job (job_state, duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_jobstate_numnodes ON job (job_state, num_nodes);
|
||||
|
||||
-- ArrayJob Filter
|
||||
CREATE INDEX IF NOT EXISTS jobs_arrayjobid_starttime ON job (array_job_id, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_cluster_arrayjobid_starttime ON job (cluster, array_job_id, start_time);
|
||||
|
||||
-- Sorting without active filters
|
||||
CREATE INDEX IF NOT EXISTS jobs_starttime ON job (start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_duration ON job (duration);
|
||||
CREATE INDEX IF NOT EXISTS jobs_numnodes ON job (num_nodes);
|
||||
|
||||
-- Single filters with default starttime sorting
|
||||
CREATE INDEX IF NOT EXISTS jobs_duration_starttime ON job (duration, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_numnodes_starttime ON job (num_nodes, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_numacc_starttime ON job (num_acc, start_time);
|
||||
CREATE INDEX IF NOT EXISTS jobs_energy_starttime ON job (energy, start_time);
|
||||
|
||||
-- Optimize DB index usage
|
||||
@@ -130,7 +130,7 @@ func nodeTestSetup(t *testing.T) {
|
||||
}
|
||||
|
||||
dbfilepath := filepath.Join(tmpdir, "test.db")
|
||||
err := MigrateDB("sqlite3", dbfilepath)
|
||||
err := MigrateDB(dbfilepath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -149,7 +149,7 @@ func setup(tb testing.TB) *JobRepository {
|
||||
tb.Helper()
|
||||
cclog.Init("warn", true)
|
||||
dbfile := "testdata/job.db"
|
||||
err := MigrateDB("sqlite3", dbfile)
|
||||
err := MigrateDB(dbfile)
|
||||
noErr(tb, err)
|
||||
Connect("sqlite3", dbfile)
|
||||
return GetJobRepository()
|
||||
|
||||
@@ -73,9 +73,6 @@ func (r *JobRepository) buildStatsQuery(
|
||||
col string,
|
||||
) sq.SelectBuilder {
|
||||
var query sq.SelectBuilder
|
||||
castType := r.getCastType()
|
||||
|
||||
// fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType)
|
||||
|
||||
if col != "" {
|
||||
// Scan columns: id, name, totalJobs, totalUsers, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
|
||||
@@ -84,26 +81,26 @@ func (r *JobRepository) buildStatsQuery(
|
||||
"name",
|
||||
"COUNT(job.id) as totalJobs",
|
||||
"COUNT(DISTINCT job.hpc_user) AS totalUsers",
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as %s) as totalWalltime`, time.Now().Unix(), castType),
|
||||
fmt.Sprintf(`CAST(SUM(job.num_nodes) as %s) as totalNodes`, castType),
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as %s) as totalNodeHours`, time.Now().Unix(), castType),
|
||||
fmt.Sprintf(`CAST(SUM(job.num_hwthreads) as %s) as totalCores`, castType),
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as %s) as totalCoreHours`, time.Now().Unix(), castType),
|
||||
fmt.Sprintf(`CAST(SUM(job.num_acc) as %s) as totalAccs`, castType),
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as %s) as totalAccHours`, time.Now().Unix(), castType),
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as int) as totalWalltime`, time.Now().Unix()),
|
||||
fmt.Sprintf(`CAST(SUM(job.num_nodes) as int) as totalNodes`),
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as int) as totalNodeHours`, time.Now().Unix()),
|
||||
fmt.Sprintf(`CAST(SUM(job.num_hwthreads) as int) as totalCores`),
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as int) as totalCoreHours`, time.Now().Unix()),
|
||||
fmt.Sprintf(`CAST(SUM(job.num_acc) as int) as totalAccs`),
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as int) as totalAccHours`, time.Now().Unix()),
|
||||
).From("job").LeftJoin("hpc_user ON hpc_user.username = job.hpc_user").GroupBy(col)
|
||||
} else {
|
||||
// Scan columns: totalJobs, totalUsers, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
|
||||
query = sq.Select(
|
||||
"COUNT(job.id) as totalJobs",
|
||||
"COUNT(DISTINCT job.hpc_user) AS totalUsers",
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as %s)`, time.Now().Unix(), castType),
|
||||
fmt.Sprintf(`CAST(SUM(job.num_nodes) as %s)`, castType),
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as %s)`, time.Now().Unix(), castType),
|
||||
fmt.Sprintf(`CAST(SUM(job.num_hwthreads) as %s)`, castType),
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as %s)`, time.Now().Unix(), castType),
|
||||
fmt.Sprintf(`CAST(SUM(job.num_acc) as %s)`, castType),
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as %s)`, time.Now().Unix(), castType),
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as int)`, time.Now().Unix()),
|
||||
fmt.Sprintf(`CAST(SUM(job.num_nodes) as int)`),
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as int)`, time.Now().Unix()),
|
||||
fmt.Sprintf(`CAST(SUM(job.num_hwthreads) as int)`),
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_hwthreads) / 3600) as int)`, time.Now().Unix()),
|
||||
fmt.Sprintf(`CAST(SUM(job.num_acc) as int)`),
|
||||
fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as int)`, time.Now().Unix()),
|
||||
).From("job")
|
||||
}
|
||||
|
||||
@@ -114,21 +111,6 @@ func (r *JobRepository) buildStatsQuery(
|
||||
return query
|
||||
}
|
||||
|
||||
func (r *JobRepository) getCastType() string {
|
||||
var castType string
|
||||
|
||||
switch r.driver {
|
||||
case "sqlite3":
|
||||
castType = "int"
|
||||
case "mysql":
|
||||
castType = "unsigned"
|
||||
default:
|
||||
castType = ""
|
||||
}
|
||||
|
||||
return castType
|
||||
}
|
||||
|
||||
func (r *JobRepository) JobsStatsGrouped(
|
||||
ctx context.Context,
|
||||
filter []*model.JobFilter,
|
||||
@@ -477,10 +459,9 @@ func (r *JobRepository) AddHistograms(
|
||||
targetBinSize = 3600
|
||||
}
|
||||
|
||||
castType := r.getCastType()
|
||||
var err error
|
||||
// Return X-Values always as seconds, will be formatted into minutes and hours in frontend
|
||||
value := fmt.Sprintf(`CAST(ROUND(((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / %d) + 1) as %s) as value`, time.Now().Unix(), targetBinSize, castType)
|
||||
value := fmt.Sprintf(`CAST(ROUND(((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / %d) + 1) as int) as value`, time.Now().Unix(), targetBinSize)
|
||||
stat.HistDuration, err = r.jobsDurationStatisticsHistogram(ctx, value, filter, targetBinSize, &targetBinCount)
|
||||
if err != nil {
|
||||
cclog.Warn("Error while loading job statistics histogram: job duration")
|
||||
|
||||
@@ -224,10 +224,10 @@ func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts
|
||||
}
|
||||
|
||||
// Query and Count Jobs with attached Tags
|
||||
q := sq.Select("t.tag_name, t.id, count(jt.tag_id)").
|
||||
q := sq.Select("t.tag_type, t.tag_name, t.id, count(jt.tag_id)").
|
||||
From("tag t").
|
||||
LeftJoin("jobtag jt ON t.id = jt.tag_id").
|
||||
GroupBy("t.tag_name")
|
||||
GroupBy("t.tag_type, t.tag_name")
|
||||
|
||||
// Build scope list for filtering
|
||||
var scopeBuilder strings.Builder
|
||||
@@ -260,14 +260,15 @@ func (r *JobRepository) CountTags(user *schema.User) (tags []schema.Tag, counts
|
||||
|
||||
counts = make(map[string]int)
|
||||
for rows.Next() {
|
||||
var tagType string
|
||||
var tagName string
|
||||
var tagId int
|
||||
var count int
|
||||
if err = rows.Scan(&tagName, &tagId, &count); err != nil {
|
||||
if err = rows.Scan(&tagType, &tagName, &tagId, &count); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// Use tagId as second Map-Key component to differentiate tags with identical names
|
||||
counts[fmt.Sprint(tagName, tagId)] = count
|
||||
counts[fmt.Sprint(tagType, tagName, tagId)] = count
|
||||
}
|
||||
err = rows.Err()
|
||||
|
||||
|
||||
@@ -42,7 +42,7 @@ func setupUserTest(t *testing.T) *UserCfgRepo {
|
||||
|
||||
cclog.Init("info", true)
|
||||
dbfilepath := "testdata/job.db"
|
||||
err := MigrateDB("sqlite3", dbfilepath)
|
||||
err := MigrateDB(dbfilepath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -205,13 +205,13 @@ func setupTaglistRoute(i InfoType, r *http.Request) InfoType {
|
||||
"id": tag.ID,
|
||||
"name": tag.Name,
|
||||
"scope": tag.Scope,
|
||||
"count": counts[fmt.Sprint(tag.Name, tag.ID)],
|
||||
"count": counts[fmt.Sprint(tag.Type, tag.Name, tag.ID)],
|
||||
}
|
||||
tagMap[tag.Type] = append(tagMap[tag.Type], tagItem)
|
||||
}
|
||||
} else if userAuthlevel < 4 && userAuthlevel >= 2 { // User+ : Show global and admin scope only if at least 1 tag used, private scope regardless of count
|
||||
for _, tag := range tags {
|
||||
tagCount := counts[fmt.Sprint(tag.Name, tag.ID)]
|
||||
tagCount := counts[fmt.Sprint(tag.Type, tag.Name, tag.ID)]
|
||||
if ((tag.Scope == "global" || tag.Scope == "admin") && tagCount >= 1) || (tag.Scope != "global" && tag.Scope != "admin") {
|
||||
tagItem := map[string]interface{}{
|
||||
"id": tag.ID,
|
||||
|
||||
@@ -15,7 +15,7 @@ func setup(tb testing.TB) *repository.JobRepository {
|
||||
tb.Helper()
|
||||
cclog.Init("warn", true)
|
||||
dbfile := "../repository/testdata/job.db"
|
||||
err := repository.MigrateDB("sqlite3", dbfile)
|
||||
err := repository.MigrateDB(dbfile)
|
||||
noErr(tb, err)
|
||||
repository.Connect("sqlite3", dbfile)
|
||||
return repository.GetJobRepository()
|
||||
|
||||
Reference in New Issue
Block a user