List parallel jobs on node for jobs on shared node

- Relates to issue #97
- required GQL schema extension and regeneration
- Works for archived jobs aswell
This commit is contained in:
Christoph Kluge
2023-04-28 12:34:40 +02:00
parent e9301a5d39
commit d93610f700
8 changed files with 691 additions and 5 deletions

View File

@@ -74,7 +74,7 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
&job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId,
&job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State,
&job.Duration, &job.Walltime, &job.RawResources /*&job.RawMetaData*/); err != nil {
log.Warn("Error while scanning rows")
log.Warn("Error while scanning rows (Job)")
return nil, err
}
@@ -96,6 +96,17 @@ func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) {
return job, nil
}
func scanJobLink(row interface{ Scan(...interface{}) error }) (*model.JobLink, error) {
jobLink := &model.JobLink{}
if err := row.Scan(
&jobLink.ID, &jobLink.JobID); err != nil {
log.Warn("Error while scanning rows (jobLink)")
return nil, err
}
return jobLink, nil
}
func (r *JobRepository) FetchJobName(job *schema.Job) (*string, error) {
start := time.Now()
cachekey := fmt.Sprintf("metadata:%d", job.ID)

View File

@@ -70,7 +70,7 @@ func (r *JobRepository) QueryJobs(
job, err := scanJob(rows)
if err != nil {
rows.Close()
log.Warn("Error while scanning rows")
log.Warn("Error while scanning rows (Jobs)")
return nil, err
}
jobs = append(jobs, job)
@@ -79,6 +79,48 @@ func (r *JobRepository) QueryJobs(
return jobs, nil
}
// QueryJobLinks returns a list of minimal job information (DB-ID and jobId) of shared jobs for link-building based the provided filters.
func (r *JobRepository) QueryJobLinks(
ctx context.Context,
filters []*model.JobFilter) ([]*model.JobLink, error) {
query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id").From("job"))
if qerr != nil {
return nil, qerr
}
for _, f := range filters {
query = BuildWhereClause(f, query)
}
sql, args, err := query.ToSql()
if err != nil {
log.Warn("Error while converting query to sql")
return nil, err
}
log.Debugf("SQL query: `%s`, args: %#v", sql, args)
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
jobLinks := make([]*model.JobLink, 0, 50)
for rows.Next() {
jobLink, err := scanJobLink(rows)
if err != nil {
rows.Close()
log.Warn("Error while scanning rows (JobLinks)")
return nil, err
}
jobLinks = append(jobLinks, jobLink)
}
return jobLinks, nil
}
// CountJobs counts the number of jobs matching the filters.
func (r *JobRepository) CountJobs(
ctx context.Context,
@@ -187,6 +229,22 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select
if filter.MemUsedMax != nil {
query = buildFloatCondition("job.mem_used_max", filter.MemUsedMax, query)
}
// Shared Jobs Query
if filter.Exclusive != nil {
query = query.Where("job.exclusive = ?", *filter.Exclusive)
}
if filter.SharedNode != nil {
query = buildStringCondition("job.resources", filter.SharedNode, query)
}
if filter.SelfJobID != nil {
query = buildStringCondition("job.job_id", filter.SelfJobID, query)
}
if filter.SelfStartTime != nil && filter.SelfDuration != nil { // Offset of 30 minutes?
log.Debug("SET SELFTIME FILTERS")
start := filter.SelfStartTime.Unix() // There does not seam to be a portable way to get the current unix timestamp accross different DBs.
end := start + int64(*filter.SelfDuration)
query = query.Where("((job.start_time BETWEEN ? AND ?) OR ((job.start_time + job.duration) BETWEEN ? AND ?))", start, end, start, end)
}
return query
}
@@ -214,6 +272,9 @@ func buildStringCondition(field string, cond *model.StringInput, query sq.Select
if cond.Eq != nil {
return query.Where(field+" = ?", *cond.Eq)
}
if cond.Neq != nil {
return query.Where(field+" != ?", *cond.Neq)
}
if cond.StartsWith != nil {
return query.Where(field+" LIKE ?", fmt.Sprint(*cond.StartsWith, "%"))
}