mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2024-12-26 13:29:05 +01:00
Merge branch 'master' into hotfix
This commit is contained in:
commit
f3a8061dfc
@ -32,32 +32,15 @@ func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ConcurrentJobs is the resolver for the concurrentJobs field.
|
// ConcurrentJobs is the resolver for the concurrentJobs field.
|
||||||
func (r *jobResolver) ConcurrentJobs(ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) {
|
func (r *jobResolver) ConcurrentJobs(
|
||||||
exc := int(obj.Exclusive)
|
ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) {
|
||||||
if exc != 1 {
|
|
||||||
filter := []*model.JobFilter{}
|
|
||||||
jid := fmt.Sprint(obj.JobID)
|
|
||||||
jdu := int(obj.Duration)
|
|
||||||
filter = append(filter, &model.JobFilter{Exclusive: &exc})
|
|
||||||
filter = append(filter, &model.JobFilter{SharedNode: &model.StringInput{Contains: &obj.Resources[0].Hostname}})
|
|
||||||
filter = append(filter, &model.JobFilter{SelfJobID: &model.StringInput{Neq: &jid}})
|
|
||||||
filter = append(filter, &model.JobFilter{SelfStartTime: &obj.StartTime, SelfDuration: &jdu})
|
|
||||||
|
|
||||||
jobLinks, err := r.Repo.QueryJobLinks(ctx, filter)
|
if obj.State == schema.JobStateRunning {
|
||||||
if err != nil {
|
obj.Duration = int32(time.Now().Unix() - obj.StartTimeUnix)
|
||||||
log.Warn("Error while querying jobLinks")
|
}
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
count, err := r.Repo.CountJobs(ctx, filter)
|
if obj.Exclusive != 1 && obj.Duration > 600 {
|
||||||
if err != nil {
|
return r.Repo.FindConcurrentJobs(ctx, obj)
|
||||||
log.Warn("Error while counting jobLinks")
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
result := &model.JobLinkResultList{Items: jobLinks, Count: &count}
|
|
||||||
|
|
||||||
return result, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
@ -291,6 +291,97 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) {
|
|||||||
return scanJob(q.RunWith(r.stmtCache).QueryRow())
|
return scanJob(q.RunWith(r.stmtCache).QueryRow())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *JobRepository) FindConcurrentJobs(
|
||||||
|
ctx context.Context,
|
||||||
|
job *schema.Job) (*model.JobLinkResultList, error) {
|
||||||
|
if job == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id").From("job"))
|
||||||
|
if qerr != nil {
|
||||||
|
return nil, qerr
|
||||||
|
}
|
||||||
|
|
||||||
|
query = query.Where("cluster = ?", job.Cluster)
|
||||||
|
var startTime int64
|
||||||
|
var stopTime int64
|
||||||
|
|
||||||
|
startTime = job.StartTimeUnix
|
||||||
|
|
||||||
|
if job.State == schema.JobStateRunning {
|
||||||
|
stopTime = time.Now().Unix()
|
||||||
|
} else {
|
||||||
|
stopTime = startTime + int64(job.Duration)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add 5m overlap for jobs start time at the end
|
||||||
|
stopTimeTail := stopTime - 300
|
||||||
|
startTimeTail := startTime + 10
|
||||||
|
startTimeFront := startTime + 300
|
||||||
|
|
||||||
|
queryRunning := query.Where("job.job_state = ?").Where("(job.start_time BETWEEN ? AND ?) OR (job.start_time < ?))",
|
||||||
|
"running", startTimeTail, stopTimeTail, startTime)
|
||||||
|
|
||||||
|
query = query.Where("job.job_state != ?").Where("(job.start_time BETWEEN ? AND ?) OR ((job.start_time + job.duration) BETWEEN ? AND ?) OR ((job.start_time < ?) AND ((job.start_time + job.duration)) > ?)",
|
||||||
|
"running", startTimeTail, stopTimeTail, startTimeFront, stopTime, startTime, stopTime)
|
||||||
|
|
||||||
|
rows, err := query.RunWith(r.stmtCache).Query()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error while running query: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
items := make([]*model.JobLink, 0, 10)
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
var id, jobId sql.NullInt64
|
||||||
|
|
||||||
|
if err = rows.Scan(&id, &jobId); err != nil {
|
||||||
|
log.Warn("Error while scanning rows")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if id.Valid {
|
||||||
|
items = append(items,
|
||||||
|
&model.JobLink{
|
||||||
|
ID: fmt.Sprint(id),
|
||||||
|
JobID: int(jobId.Int64),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, err = queryRunning.RunWith(r.stmtCache).Query()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error while running query: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
var id, jobId sql.NullInt64
|
||||||
|
|
||||||
|
if err := rows.Scan(&id, &jobId); err != nil {
|
||||||
|
log.Warn("Error while scanning rows")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if id.Valid {
|
||||||
|
items = append(items,
|
||||||
|
&model.JobLink{
|
||||||
|
ID: fmt.Sprint(id),
|
||||||
|
JobID: int(jobId.Int64),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cnt := len(items)
|
||||||
|
|
||||||
|
return &model.JobLinkResultList{
|
||||||
|
Items: items,
|
||||||
|
Count: &cnt,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Start inserts a new job in the table, returning the unique job ID.
|
// Start inserts a new job in the table, returning the unique job ID.
|
||||||
// Statistics are not transfered!
|
// Statistics are not transfered!
|
||||||
func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {
|
func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {
|
||||||
|
@ -48,16 +48,9 @@ func (r *JobRepository) queryJobs(
|
|||||||
query = BuildWhereClause(f, query)
|
query = BuildWhereClause(f, query)
|
||||||
}
|
}
|
||||||
|
|
||||||
sql, args, err := query.ToSql()
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Error while converting query to sql")
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Debugf("SQL query: `%s`, args: %#v", sql, args)
|
|
||||||
rows, err := query.RunWith(r.stmtCache).Query()
|
rows, err := query.RunWith(r.stmtCache).Query()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error while running query")
|
log.Errorf("Error while running query: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,7 +85,6 @@ func (r *JobRepository) QueryJobs(
|
|||||||
order *model.OrderByInput) ([]*schema.Job, error) {
|
order *model.OrderByInput) ([]*schema.Job, error) {
|
||||||
|
|
||||||
query, qerr := SecurityCheck(ctx, sq.Select(jobColumns...).From("job"))
|
query, qerr := SecurityCheck(ctx, sq.Select(jobColumns...).From("job"))
|
||||||
|
|
||||||
if qerr != nil {
|
if qerr != nil {
|
||||||
return nil, qerr
|
return nil, qerr
|
||||||
}
|
}
|
||||||
@ -100,62 +92,6 @@ func (r *JobRepository) QueryJobs(
|
|||||||
return r.queryJobs(query, filters, page, order)
|
return r.queryJobs(query, filters, page, order)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SecurityCheck-less, private: returns a list of minimal job information (DB-ID and jobId) of shared jobs for link-building based the provided filters.
|
|
||||||
func (r *JobRepository) queryJobLinks(
|
|
||||||
query sq.SelectBuilder,
|
|
||||||
filters []*model.JobFilter) ([]*model.JobLink, error) {
|
|
||||||
|
|
||||||
for _, f := range filters {
|
|
||||||
query = BuildWhereClause(f, query)
|
|
||||||
}
|
|
||||||
|
|
||||||
sql, args, err := query.ToSql()
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Error while converting query to sql")
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Debugf("SQL query: `%s`, args: %#v", sql, args)
|
|
||||||
rows, err := query.RunWith(r.stmtCache).Query()
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Error while running query")
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
jobLinks := make([]*model.JobLink, 0, 50)
|
|
||||||
for rows.Next() {
|
|
||||||
jobLink, err := scanJobLink(rows)
|
|
||||||
if err != nil {
|
|
||||||
rows.Close()
|
|
||||||
log.Warn("Error while scanning rows (JobLinks)")
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
jobLinks = append(jobLinks, jobLink)
|
|
||||||
}
|
|
||||||
|
|
||||||
return jobLinks, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// testFunction for queryJobLinks
|
|
||||||
func (r *JobRepository) testQueryJobLinks(
|
|
||||||
filters []*model.JobFilter) ([]*model.JobLink, error) {
|
|
||||||
|
|
||||||
return r.queryJobLinks(sq.Select(jobColumns...).From("job"), filters)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *JobRepository) QueryJobLinks(
|
|
||||||
ctx context.Context,
|
|
||||||
filters []*model.JobFilter) ([]*model.JobLink, error) {
|
|
||||||
|
|
||||||
query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id").From("job"))
|
|
||||||
|
|
||||||
if qerr != nil {
|
|
||||||
return nil, qerr
|
|
||||||
}
|
|
||||||
|
|
||||||
return r.queryJobLinks(query, filters)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SecurityCheck-less, private: Returns the number of jobs matching the filters
|
// SecurityCheck-less, private: Returns the number of jobs matching the filters
|
||||||
func (r *JobRepository) countJobs(query sq.SelectBuilder,
|
func (r *JobRepository) countJobs(query sq.SelectBuilder,
|
||||||
filters []*model.JobFilter) (int, error) {
|
filters []*model.JobFilter) (int, error) {
|
||||||
@ -164,13 +100,6 @@ func (r *JobRepository) countJobs(query sq.SelectBuilder,
|
|||||||
query = BuildWhereClause(f, query)
|
query = BuildWhereClause(f, query)
|
||||||
}
|
}
|
||||||
|
|
||||||
sql, args, err := query.ToSql()
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Error while converting query to sql")
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Debugf("SQL query: `%s`, args: %#v", sql, args)
|
|
||||||
var count int
|
var count int
|
||||||
if err := query.RunWith(r.DB).Scan(&count); err != nil {
|
if err := query.RunWith(r.DB).Scan(&count); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
@ -291,21 +220,6 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select
|
|||||||
if filter.MemUsedMax != nil {
|
if filter.MemUsedMax != nil {
|
||||||
query = buildFloatCondition("job.mem_used_max", filter.MemUsedMax, query)
|
query = buildFloatCondition("job.mem_used_max", filter.MemUsedMax, query)
|
||||||
}
|
}
|
||||||
// Shared Jobs Query
|
|
||||||
if filter.Exclusive != nil {
|
|
||||||
query = query.Where("job.exclusive = ?", *filter.Exclusive)
|
|
||||||
}
|
|
||||||
if filter.SharedNode != nil {
|
|
||||||
query = buildStringCondition("job.resources", filter.SharedNode, query)
|
|
||||||
}
|
|
||||||
if filter.SelfJobID != nil {
|
|
||||||
query = buildStringCondition("job.job_id", filter.SelfJobID, query)
|
|
||||||
}
|
|
||||||
if filter.SelfStartTime != nil && filter.SelfDuration != nil {
|
|
||||||
start := filter.SelfStartTime.Unix() + 10 // There does not seem to be a portable way to get the current unix timestamp accross different DBs.
|
|
||||||
end := start + int64(*filter.SelfDuration) - 20
|
|
||||||
query = query.Where("((job.start_time BETWEEN ? AND ?) OR ((job.start_time + job.duration) BETWEEN ? AND ?))", start, end, start, end)
|
|
||||||
}
|
|
||||||
return query
|
return query
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user