Add timer calls to repository functions

This commit is contained in:
Jan Eitzinger 2023-02-20 15:08:23 +01:00
parent 3b38d8042e
commit 643bd3fb21

View File

@ -93,6 +93,7 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
} }
func (r *JobRepository) FetchJobName(job *schema.Job) (*string, error) { func (r *JobRepository) FetchJobName(job *schema.Job) (*string, error) {
start := time.Now()
cachekey := fmt.Sprintf("metadata:%d", job.ID) cachekey := fmt.Sprintf("metadata:%d", job.ID)
if cached := r.cache.Get(cachekey, nil); cached != nil { if cached := r.cache.Get(cachekey, nil); cached != nil {
job.MetaData = cached.(map[string]string) job.MetaData = cached.(map[string]string)
@ -115,6 +116,7 @@ func (r *JobRepository) FetchJobName(job *schema.Job) (*string, error) {
} }
r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour) r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour)
log.Infof("Timer %s", time.Since(start))
if jobName := job.MetaData["jobName"]; jobName != "" { if jobName := job.MetaData["jobName"]; jobName != "" {
return &jobName, nil return &jobName, nil
@ -124,6 +126,7 @@ func (r *JobRepository) FetchJobName(job *schema.Job) (*string, error) {
} }
func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error) { func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error) {
start := time.Now()
cachekey := fmt.Sprintf("metadata:%d", job.ID) cachekey := fmt.Sprintf("metadata:%d", job.ID)
if cached := r.cache.Get(cachekey, nil); cached != nil { if cached := r.cache.Get(cachekey, nil); cached != nil {
job.MetaData = cached.(map[string]string) job.MetaData = cached.(map[string]string)
@ -146,6 +149,7 @@ func (r *JobRepository) FetchMetadata(job *schema.Job) (map[string]string, error
} }
r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour) r.cache.Put(cachekey, job.MetaData, len(job.RawMetaData), 24*time.Hour)
log.Infof("Timer %s", time.Since(start))
return job.MetaData, nil return job.MetaData, nil
} }
@ -194,6 +198,7 @@ func (r *JobRepository) Find(
cluster *string, cluster *string,
startTime *int64) (*schema.Job, error) { startTime *int64) (*schema.Job, error) {
start := time.Now()
q := sq.Select(jobColumns...).From("job"). q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId) Where("job.job_id = ?", *jobId)
@ -204,6 +209,7 @@ func (r *JobRepository) Find(
q = q.Where("job.start_time = ?", *startTime) q = q.Where("job.start_time = ?", *startTime)
} }
log.Infof("Timer %s", time.Since(start))
return scanJob(q.RunWith(r.stmtCache).QueryRow()) return scanJob(q.RunWith(r.stmtCache).QueryRow())
} }
@ -217,6 +223,7 @@ func (r *JobRepository) FindAll(
cluster *string, cluster *string,
startTime *int64) ([]*schema.Job, error) { startTime *int64) ([]*schema.Job, error) {
start := time.Now()
q := sq.Select(jobColumns...).From("job"). q := sq.Select(jobColumns...).From("job").
Where("job.job_id = ?", *jobId) Where("job.job_id = ?", *jobId)
@ -242,6 +249,7 @@ func (r *JobRepository) FindAll(
} }
jobs = append(jobs, job) jobs = append(jobs, job)
} }
log.Infof("Timer %s", time.Since(start))
return jobs, nil return jobs, nil
} }
@ -324,6 +332,7 @@ func (r *JobRepository) DeleteJobById(id int64) error {
// TODO: Use node hours instead: SELECT job.user, sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN CAST(strftime('%s', 'now') AS INTEGER) - job.start_time ELSE job.duration END)) as x FROM job GROUP BY user ORDER BY x DESC; // TODO: Use node hours instead: SELECT job.user, sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN CAST(strftime('%s', 'now') AS INTEGER) - job.start_time ELSE job.duration END)) as x FROM job GROUP BY user ORDER BY x DESC;
func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggregate, filters []*model.JobFilter, weight *model.Weights, limit *int) (map[string]int, error) { func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggregate, filters []*model.JobFilter, weight *model.Weights, limit *int) (map[string]int, error) {
start := time.Now()
if !aggreg.IsValid() { if !aggreg.IsValid() {
return nil, errors.New("invalid aggregate") return nil, errors.New("invalid aggregate")
} }
@ -370,6 +379,7 @@ func (r *JobRepository) CountGroupedJobs(ctx context.Context, aggreg model.Aggre
counts[group] = count counts[group] = count
} }
log.Infof("Timer %s", time.Since(start))
return counts, nil return counts, nil
} }
@ -536,6 +546,7 @@ func (r *JobRepository) FindUser(ctx context.Context, searchterm string) (userna
} }
func (r *JobRepository) FindProject(ctx context.Context, searchterm string) (project string, err error) { func (r *JobRepository) FindProject(ctx context.Context, searchterm string) (project string, err error) {
user := auth.GetUser(ctx) user := auth.GetUser(ctx)
if user == nil || user.HasRole(auth.RoleAdmin) || user.HasRole(auth.RoleSupport) { if user == nil || user.HasRole(auth.RoleAdmin) || user.HasRole(auth.RoleSupport) {
err := sq.Select("job.project").Distinct().From("job"). err := sq.Select("job.project").Distinct().From("job").
@ -547,7 +558,6 @@ func (r *JobRepository) FindProject(ctx context.Context, searchterm string) (pro
return project, nil return project, nil
} }
return "", ErrNotFound return "", ErrNotFound
} else { } else {
log.Infof("Non-Admin User %s : Requested Query Project -> %s: Forbidden", user.Name, project) log.Infof("Non-Admin User %s : Requested Query Project -> %s: Forbidden", user.Name, project)
return "", ErrForbidden return "", ErrForbidden
@ -556,6 +566,7 @@ func (r *JobRepository) FindProject(ctx context.Context, searchterm string) (pro
func (r *JobRepository) Partitions(cluster string) ([]string, error) { func (r *JobRepository) Partitions(cluster string) ([]string, error) {
var err error var err error
start := time.Now()
partitions := r.cache.Get("partitions:"+cluster, func() (interface{}, time.Duration, int) { partitions := r.cache.Get("partitions:"+cluster, func() (interface{}, time.Duration, int) {
parts := []string{} parts := []string{}
if err = r.DB.Select(&parts, `SELECT DISTINCT job.partition FROM job WHERE job.cluster = ?;`, cluster); err != nil { if err = r.DB.Select(&parts, `SELECT DISTINCT job.partition FROM job WHERE job.cluster = ?;`, cluster); err != nil {
@ -567,12 +578,15 @@ func (r *JobRepository) Partitions(cluster string) ([]string, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
log.Infof("Timer %s", time.Since(start))
return partitions.([]string), nil return partitions.([]string), nil
} }
// AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host. // AllocatedNodes returns a map of all subclusters to a map of hostnames to the amount of jobs running on that host.
// Hosts with zero jobs running on them will not show up! // Hosts with zero jobs running on them will not show up!
func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) { func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]int, error) {
start := time.Now()
subclusters := make(map[string]map[string]int) subclusters := make(map[string]map[string]int)
rows, err := sq.Select("resources", "subcluster").From("job"). rows, err := sq.Select("resources", "subcluster").From("job").
Where("job.job_state = 'running'"). Where("job.job_state = 'running'").
@ -609,10 +623,13 @@ func (r *JobRepository) AllocatedNodes(cluster string) (map[string]map[string]in
} }
} }
log.Infof("Timer %s", time.Since(start))
return subclusters, nil return subclusters, nil
} }
func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error { func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
start := time.Now()
res, err := sq.Update("job"). res, err := sq.Update("job").
Set("monitoring_status", schema.MonitoringStatusArchivingFailed). Set("monitoring_status", schema.MonitoringStatusArchivingFailed).
Set("duration", 0). Set("duration", 0).
@ -635,6 +652,7 @@ func (r *JobRepository) StopJobsExceedingWalltimeBy(seconds int) error {
if rowsAffected > 0 { if rowsAffected > 0 {
log.Infof("%d jobs have been marked as failed due to running too long", rowsAffected) log.Infof("%d jobs have been marked as failed due to running too long", rowsAffected)
} }
log.Infof("Timer %s", time.Since(start))
return nil return nil
} }
@ -652,8 +670,8 @@ var groupBy2column = map[model.Aggregate]string{
func (r *JobRepository) JobsStatistics(ctx context.Context, func (r *JobRepository) JobsStatistics(ctx context.Context,
filter []*model.JobFilter, filter []*model.JobFilter,
groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
start := time.Now()
start := time.Now()
// In case `groupBy` is nil (not used), the model.JobsStatistics used is at the key '' (empty string) // In case `groupBy` is nil (not used), the model.JobsStatistics used is at the key '' (empty string)
stats := map[string]*model.JobsStatistics{} stats := map[string]*model.JobsStatistics{}
@ -800,7 +818,10 @@ func (r *JobRepository) JobsStatistics(ctx context.Context,
// `value` must be the column grouped by, but renamed to "value". `id` and `col` can optionally be used // `value` must be the column grouped by, but renamed to "value". `id` and `col` can optionally be used
// to add a condition to the query of the kind "<col> = <id>". // to add a condition to the query of the kind "<col> = <id>".
func (r *JobRepository) jobsStatisticsHistogram(ctx context.Context, value string, filters []*model.JobFilter, id, col string) ([]*model.HistoPoint, error) { func (r *JobRepository) jobsStatisticsHistogram(ctx context.Context,
value string, filters []*model.JobFilter, id, col string) ([]*model.HistoPoint, error) {
start := time.Now()
query := sq.Select(value, "COUNT(job.id) AS count").From("job") query := sq.Select(value, "COUNT(job.id) AS count").From("job")
query = SecurityCheck(ctx, query) query = SecurityCheck(ctx, query)
for _, f := range filters { for _, f := range filters {
@ -827,5 +848,6 @@ func (r *JobRepository) jobsStatisticsHistogram(ctx context.Context, value strin
points = append(points, &point) points = append(points, &point)
} }
log.Infof("Timer %s", time.Since(start))
return points, nil return points, nil
} }