mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-01-14 05:29:05 +01:00
Sync commit
This commit is contained in:
parent
1f60963cbb
commit
33613cdda0
@ -32,32 +32,15 @@ func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ConcurrentJobs is the resolver for the concurrentJobs field.
|
// ConcurrentJobs is the resolver for the concurrentJobs field.
|
||||||
func (r *jobResolver) ConcurrentJobs(ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) {
|
func (r *jobResolver) ConcurrentJobs(
|
||||||
exc := int(obj.Exclusive)
|
ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) {
|
||||||
if exc != 1 {
|
|
||||||
filter := []*model.JobFilter{}
|
|
||||||
jid := fmt.Sprint(obj.JobID)
|
|
||||||
jdu := int(obj.Duration)
|
|
||||||
filter = append(filter, &model.JobFilter{Exclusive: &exc})
|
|
||||||
filter = append(filter, &model.JobFilter{SharedNode: &model.StringInput{Contains: &obj.Resources[0].Hostname}})
|
|
||||||
filter = append(filter, &model.JobFilter{SelfJobID: &model.StringInput{Neq: &jid}})
|
|
||||||
filter = append(filter, &model.JobFilter{SelfStartTime: &obj.StartTime, SelfDuration: &jdu})
|
|
||||||
|
|
||||||
jobLinks, err := r.Repo.QueryJobLinks(ctx, filter)
|
if obj.State == schema.JobStateRunning {
|
||||||
if err != nil {
|
obj.Duration = int32(time.Now().Unix() - obj.StartTimeUnix)
|
||||||
log.Warn("Error while querying jobLinks")
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
count, err := r.Repo.CountJobs(ctx, filter)
|
if obj.Exclusive != 1 && obj.Duration > 600 {
|
||||||
if err != nil {
|
return r.Repo.FindConcurrentJobs(obj)
|
||||||
log.Warn("Error while counting jobLinks")
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
result := &model.JobLinkResultList{Items: jobLinks, Count: &count}
|
|
||||||
|
|
||||||
return result, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
@ -292,6 +292,26 @@ func (r *JobRepository) FindById(jobId int64) (*schema.Job, error) {
|
|||||||
return scanJob(q.RunWith(r.stmtCache).QueryRow())
|
return scanJob(q.RunWith(r.stmtCache).QueryRow())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *JobRepository) FindConcurrentJobs(
|
||||||
|
job *schema.Job) (*model.JobLinkResultList, error) {
|
||||||
|
|
||||||
|
query := sq.Select("job.id","job.job_id").From("job").Where("cluster = ?",job.Cluster)
|
||||||
|
var startTime := job.StartTimeUnix
|
||||||
|
var stopTime int64
|
||||||
|
|
||||||
|
if job.State == schema.JobStateRunning {
|
||||||
|
stopTime = time.Now().Unix()
|
||||||
|
} else {
|
||||||
|
stopTime = startTime + int64(job.Duration)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add 5m overlap for jobs start time at the end
|
||||||
|
stopTime -= 300
|
||||||
|
|
||||||
|
query = query.Where("start_time BETWEEN ? AND ?", startTime, stopTime)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// Start inserts a new job in the table, returning the unique job ID.
|
// Start inserts a new job in the table, returning the unique job ID.
|
||||||
// Statistics are not transfered!
|
// Statistics are not transfered!
|
||||||
func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {
|
func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user