Merge branch 'dev' into 275_tag_scope_jobview_rework

This commit is contained in:
Christoph Kluge 2024-09-17 14:32:06 +02:00
commit 6367c1ab4d
5 changed files with 89 additions and 13 deletions

View File

@ -145,14 +145,18 @@ func GetAuthInstance() *Authentication {
func persistUser(user *schema.User) { func persistUser(user *schema.User) {
r := repository.GetUserRepository() r := repository.GetUserRepository()
_, err := r.GetUser(user.Username) dbUser, err := r.GetUser(user.Username)
if err != nil && err != sql.ErrNoRows { if err != nil && err != sql.ErrNoRows {
log.Errorf("Error while loading user '%s': %v", user.Username, err) log.Errorf("Error while loading user '%s': %v", user.Username, err)
} else if err == sql.ErrNoRows { } else if err == sql.ErrNoRows { // Adds New User
if err := r.AddUser(user); err != nil { if err := r.AddUser(user); err != nil {
log.Errorf("Error while adding user '%s' to DB: %v", user.Username, err) log.Errorf("Error while adding user '%s' to DB: %v", user.Username, err)
} }
} else { // Update Existing
if err := r.UpdateUser(dbUser, user); err != nil {
log.Errorf("Error while updating user '%s' to DB: %v", user.Username, err)
}
} }
} }

View File

@ -484,10 +484,12 @@ func (r *JobRepository) FindRunningJobs(cluster string) ([]*schema.Job, error) {
} }
func (r *JobRepository) UpdateDuration() error { func (r *JobRepository) UpdateDuration() error {
if _, err := sq.Update("job"). stmnt := sq.Update("job").
Set("duration", sq.Expr("? - job.start_time", time.Now().Unix())). Set("duration", sq.Expr("? - job.start_time", time.Now().Unix())).
Where("job_state = running"). Where("job_state = 'running'")
RunWith(r.stmtCache).Exec(); err != nil {
_, err := stmnt.RunWith(r.stmtCache).Exec()
if err != nil {
return err return err
} }

View File

@ -1,7 +1,11 @@
CREATE INDEX IF NOT EXISTS jobs_cluster_orderby_starttime ON job (cluster, start_time DESC); DROP INDEX job_stats;
CREATE INDEX IF NOT EXISTS jobs_cluster_count ON job (cluster, job_state, start_time); DROP INDEX job_by_user;
CREATE INDEX IF NOT EXISTS jobs_project_orderby_starttime ON job (project, start_time DESC); DROP INDEX job_by_starttime;
CREATE INDEX IF NOT EXISTS jobs_project_count ON job (project, job_state, start_time); DROP INDEX job_by_job_id;
DROP INDEX job_list;
DROP INDEX job_list_user;
DROP INDEX job_list_users;
DROP INDEX job_list_users_start;
ALTER TABLE job ADD COLUMN energy REAL NOT NULL DEFAULT 0.0; ALTER TABLE job ADD COLUMN energy REAL NOT NULL DEFAULT 0.0;
ALTER TABLE job ADD COLUMN energy_footprint TEXT DEFAULT NULL; ALTER TABLE job ADD COLUMN energy_footprint TEXT DEFAULT NULL;
@ -28,4 +32,44 @@ ALTER TABLE job DROP net_data_vol_total;
ALTER TABLE job DROP file_bw_avg; ALTER TABLE job DROP file_bw_avg;
ALTER TABLE job DROP file_data_vol_total; ALTER TABLE job DROP file_data_vol_total;
CREATE INDEX IF NOT EXISTS jobs_cluster ON job (cluster);
CREATE INDEX IF NOT EXISTS jobs_cluster_starttime ON job (cluster, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_user ON job (cluster, user);
CREATE INDEX IF NOT EXISTS jobs_cluster_project ON job (cluster, project);
CREATE INDEX IF NOT EXISTS jobs_cluster_subcluster ON job (cluster, subcluster);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition ON job (cluster, partition);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_starttime ON job (cluster, partition, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate ON job (cluster, partition, job_state);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_user ON job (cluster, partition, job_state, user);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_project ON job (cluster, partition, job_state, project);
CREATE INDEX IF NOT EXISTS jobs_cluster_partition_jobstate_starttime ON job (cluster, partition, job_state, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate ON job (cluster, job_state);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_starttime ON job (cluster, job_state, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_user ON job (cluster, job_state, user);
CREATE INDEX IF NOT EXISTS jobs_cluster_jobstate_project ON job (cluster, job_state, project);
CREATE INDEX IF NOT EXISTS jobs_user ON job (user);
CREATE INDEX IF NOT EXISTS jobs_user_starttime ON job (user, start_time);
CREATE INDEX IF NOT EXISTS jobs_project ON job (project);
CREATE INDEX IF NOT EXISTS jobs_project_starttime ON job (project, start_time);
CREATE INDEX IF NOT EXISTS jobs_project_user ON job (project, user);
CREATE INDEX IF NOT EXISTS jobs_jobstate ON job (job_state);
CREATE INDEX IF NOT EXISTS jobs_jobstate_user ON job (job_state, user);
CREATE INDEX IF NOT EXISTS jobs_jobstate_project ON job (job_state, project);
CREATE INDEX IF NOT EXISTS jobs_jobstate_cluster ON job (job_state, cluster);
CREATE INDEX IF NOT EXISTS jobs_jobstate_starttime ON job (job_state, start_time);
CREATE INDEX IF NOT EXISTS jobs_arrayjobid_starttime ON job (array_job_id, start_time);
CREATE INDEX IF NOT EXISTS jobs_cluster_arrayjobid_starttime ON job (cluster, array_job_id, start_time);
CREATE INDEX IF NOT EXISTS jobs_starttime ON job (start_time);
CREATE INDEX IF NOT EXISTS jobs_duration ON job (duration);
CREATE INDEX IF NOT EXISTS jobs_numnodes ON job (num_nodes);
CREATE INDEX IF NOT EXISTS jobs_numhwthreads ON job (num_hwthreads);
CREATE INDEX IF NOT EXISTS jobs_numacc ON job (num_acc);
PRAGMA optimize; PRAGMA optimize;

View File

@ -130,6 +130,27 @@ func (r *UserRepository) AddUser(user *schema.User) error {
return nil return nil
} }
func (r *UserRepository) UpdateUser(dbUser *schema.User, user *schema.User) error {
// user contains updated info, apply to dbuser
// TODO: Discuss updatable fields
if dbUser.Name != user.Name {
if _, err := sq.Update("user").Set("name", user.Name).Where("user.username = ?", dbUser.Username).RunWith(r.DB).Exec(); err != nil {
log.Errorf("error while updating name of user '%s'", user.Username)
return err
}
}
// Toggled until greenlit
// if dbUser.HasRole(schema.RoleManager) && !reflect.DeepEqual(dbUser.Projects, user.Projects) {
// projects, _ := json.Marshal(user.Projects)
// if _, err := sq.Update("user").Set("projects", projects).Where("user.username = ?", dbUser.Username).RunWith(r.DB).Exec(); err != nil {
// return err
// }
// }
return nil
}
func (r *UserRepository) DelUser(username string) error { func (r *UserRepository) DelUser(username string) error {
_, err := r.DB.Exec(`DELETE FROM user WHERE user.username = ?`, username) _, err := r.DB.Exec(`DELETE FROM user WHERE user.username = ?`, username)
if err != nil { if err != nil {

View File

@ -47,9 +47,10 @@ func RegisterFootprintWorker() {
scopes = append(scopes, schema.MetricScopeAccelerator) scopes = append(scopes, schema.MetricScopeAccelerator)
for _, job := range jobs { for _, job := range jobs {
log.Debugf("Try job %d", job.JobID)
jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, context.Background()) jobData, err := metricDataDispatcher.LoadData(job, allMetrics, scopes, context.Background())
if err != nil { if err != nil {
log.Error("Error wile loading job data for footprint update") log.Errorf("Error wile loading job data for footprint update: %v", err)
continue continue
} }
@ -85,11 +86,14 @@ func RegisterFootprintWorker() {
} }
stmt := sq.Update("job").Where("job.id = ?", job.ID) stmt := sq.Update("job").Where("job.id = ?", job.ID)
if stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta); err != nil { stmt, err = jobRepo.UpdateFootprint(stmt, jobMeta)
if err != nil {
log.Errorf("Update job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error()) log.Errorf("Update job (dbid: %d) failed at update Footprint step: %s", job.ID, err.Error())
continue continue
} }
if stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta); err != nil {
stmt, err = jobRepo.UpdateEnergy(stmt, jobMeta)
if err != nil {
log.Errorf("Update job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error()) log.Errorf("Update job (dbid: %d) failed at update Energy step: %s", job.ID, err.Error())
continue continue
} }
@ -104,6 +108,7 @@ func RegisterFootprintWorker() {
// log.Errorf("Update job (dbid: %d) failed at db execute: %s", job.ID, err.Error()) // log.Errorf("Update job (dbid: %d) failed at db execute: %s", job.ID, err.Error())
// continue // continue
// } // }
log.Debugf("Finish job %d", job.JobID)
} }
jobRepo.TransactionCommit(t) jobRepo.TransactionCommit(t)