Add node filter and concurrent job list query

This commit is contained in:
Jan Eitzinger 2023-06-28 13:35:41 +02:00
parent 3828c138b8
commit 7174f27a89
6 changed files with 96 additions and 56 deletions

View File

@ -237,10 +237,7 @@ input JobFilter {
memUsedMax: FloatRange memUsedMax: FloatRange
exclusive: Int exclusive: Int
sharedNode: StringInput node: StringInput
selfJobId: StringInput
selfStartTime: Time
selfDuration: Int
} }
input OrderByInput { input OrderByInput {
@ -274,6 +271,7 @@ type JobResultList {
} }
type JobLinkResultList { type JobLinkResultList {
listQuery: String
items: [JobLink!]! items: [JobLink!]!
count: Int count: Int
} }

View File

@ -116,6 +116,7 @@ type ComplexityRoot struct {
JobLinkResultList struct { JobLinkResultList struct {
Count func(childComplexity int) int Count func(childComplexity int) int
Items func(childComplexity int) int Items func(childComplexity int) int
ListQuery func(childComplexity int) int
} }
JobMetric struct { JobMetric struct {
@ -629,6 +630,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.JobLinkResultList.Items(childComplexity), true return e.complexity.JobLinkResultList.Items(childComplexity), true
case "JobLinkResultList.listQuery":
if e.complexity.JobLinkResultList.ListQuery == nil {
break
}
return e.complexity.JobLinkResultList.ListQuery(childComplexity), true
case "JobMetric.series": case "JobMetric.series":
if e.complexity.JobMetric.Series == nil { if e.complexity.JobMetric.Series == nil {
break break
@ -1739,10 +1747,7 @@ input JobFilter {
memUsedMax: FloatRange memUsedMax: FloatRange
exclusive: Int exclusive: Int
sharedNode: StringInput node: StringInput
selfJobId: StringInput
selfStartTime: Time
selfDuration: Int
} }
input OrderByInput { input OrderByInput {
@ -1776,6 +1781,7 @@ type JobResultList {
} }
type JobLinkResultList { type JobLinkResultList {
listQuery: String
items: [JobLink!]! items: [JobLink!]!
count: Int count: Int
} }
@ -3951,6 +3957,8 @@ func (ec *executionContext) fieldContext_Job_concurrentJobs(ctx context.Context,
IsResolver: true, IsResolver: true,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
switch field.Name { switch field.Name {
case "listQuery":
return ec.fieldContext_JobLinkResultList_listQuery(ctx, field)
case "items": case "items":
return ec.fieldContext_JobLinkResultList_items(ctx, field) return ec.fieldContext_JobLinkResultList_items(ctx, field)
case "count": case "count":
@ -4140,6 +4148,47 @@ func (ec *executionContext) fieldContext_JobLink_jobId(ctx context.Context, fiel
return fc, nil return fc, nil
} }
func (ec *executionContext) _JobLinkResultList_listQuery(ctx context.Context, field graphql.CollectedField, obj *model.JobLinkResultList) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_JobLinkResultList_listQuery(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.ListQuery, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
return graphql.Null
}
res := resTmp.(*string)
fc.Result = res
return ec.marshalOString2ᚖstring(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_JobLinkResultList_listQuery(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "JobLinkResultList",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type String does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _JobLinkResultList_items(ctx context.Context, field graphql.CollectedField, obj *model.JobLinkResultList) (ret graphql.Marshaler) { func (ec *executionContext) _JobLinkResultList_items(ctx context.Context, field graphql.CollectedField, obj *model.JobLinkResultList) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_JobLinkResultList_items(ctx, field) fc, err := ec.fieldContext_JobLinkResultList_items(ctx, field)
if err != nil { if err != nil {
@ -11148,7 +11197,7 @@ func (ec *executionContext) unmarshalInputJobFilter(ctx context.Context, obj int
asMap[k] = v asMap[k] = v
} }
fieldsInOrder := [...]string{"tags", "jobId", "arrayJobId", "user", "project", "jobName", "cluster", "partition", "duration", "minRunningFor", "numNodes", "numAccelerators", "numHWThreads", "startTime", "state", "flopsAnyAvg", "memBwAvg", "loadAvg", "memUsedMax", "exclusive", "sharedNode", "selfJobId", "selfStartTime", "selfDuration"} fieldsInOrder := [...]string{"tags", "jobId", "arrayJobId", "user", "project", "jobName", "cluster", "partition", "duration", "minRunningFor", "numNodes", "numAccelerators", "numHWThreads", "startTime", "state", "flopsAnyAvg", "memBwAvg", "loadAvg", "memUsedMax", "exclusive", "node"}
for _, k := range fieldsInOrder { for _, k := range fieldsInOrder {
v, ok := asMap[k] v, ok := asMap[k]
if !ok { if !ok {
@ -11315,35 +11364,11 @@ func (ec *executionContext) unmarshalInputJobFilter(ctx context.Context, obj int
if err != nil { if err != nil {
return it, err return it, err
} }
case "sharedNode": case "node":
var err error var err error
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("sharedNode")) ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("node"))
it.SharedNode, err = ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v) it.Node, err = ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v)
if err != nil {
return it, err
}
case "selfJobId":
var err error
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("selfJobId"))
it.SelfJobID, err = ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v)
if err != nil {
return it, err
}
case "selfStartTime":
var err error
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("selfStartTime"))
it.SelfStartTime, err = ec.unmarshalOTime2ᚖtimeᚐTime(ctx, v)
if err != nil {
return it, err
}
case "selfDuration":
var err error
ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("selfDuration"))
it.SelfDuration, err = ec.unmarshalOInt2ᚖint(ctx, v)
if err != nil { if err != nil {
return it, err return it, err
} }
@ -12055,6 +12080,10 @@ func (ec *executionContext) _JobLinkResultList(ctx context.Context, sel ast.Sele
switch field.Name { switch field.Name {
case "__typename": case "__typename":
out.Values[i] = graphql.MarshalString("JobLinkResultList") out.Values[i] = graphql.MarshalString("JobLinkResultList")
case "listQuery":
out.Values[i] = ec._JobLinkResultList_listQuery(ctx, field, obj)
case "items": case "items":
out.Values[i] = ec._JobLinkResultList_items(ctx, field, obj) out.Values[i] = ec._JobLinkResultList_items(ctx, field, obj)

View File

@ -57,10 +57,7 @@ type JobFilter struct {
LoadAvg *FloatRange `json:"loadAvg"` LoadAvg *FloatRange `json:"loadAvg"`
MemUsedMax *FloatRange `json:"memUsedMax"` MemUsedMax *FloatRange `json:"memUsedMax"`
Exclusive *int `json:"exclusive"` Exclusive *int `json:"exclusive"`
SharedNode *StringInput `json:"sharedNode"` Node *StringInput `json:"node"`
SelfJobID *StringInput `json:"selfJobId"`
SelfStartTime *time.Time `json:"selfStartTime"`
SelfDuration *int `json:"selfDuration"`
} }
type JobLink struct { type JobLink struct {
@ -69,6 +66,7 @@ type JobLink struct {
} }
type JobLinkResultList struct { type JobLinkResultList struct {
ListQuery *string `json:"listQuery"`
Items []*JobLink `json:"items"` Items []*JobLink `json:"items"`
Count *int `json:"count"` Count *int `json:"count"`
} }

View File

@ -32,9 +32,7 @@ func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag,
} }
// ConcurrentJobs is the resolver for the concurrentJobs field. // ConcurrentJobs is the resolver for the concurrentJobs field.
func (r *jobResolver) ConcurrentJobs( func (r *jobResolver) ConcurrentJobs(ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) {
ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) {
if obj.State == schema.JobStateRunning { if obj.State == schema.JobStateRunning {
obj.Duration = int32(time.Now().Unix() - obj.StartTimeUnix) obj.Duration = int32(time.Now().Unix() - obj.StartTimeUnix)
} }

View File

@ -10,6 +10,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"math"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@ -17,6 +18,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/internal/util"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache" "github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
@ -298,7 +300,7 @@ func (r *JobRepository) FindConcurrentJobs(
return nil, nil return nil, nil
} }
query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id").From("job")) query, qerr := SecurityCheck(ctx, sq.Select("job.id", "job.job_id", "job.start_time").From("job"))
if qerr != nil { if qerr != nil {
return nil, qerr return nil, qerr
} }
@ -308,6 +310,7 @@ func (r *JobRepository) FindConcurrentJobs(
var stopTime int64 var stopTime int64
startTime = job.StartTimeUnix startTime = job.StartTimeUnix
hostname := job.Resources[0].Hostname
if job.State == schema.JobStateRunning { if job.State == schema.JobStateRunning {
stopTime = time.Now().Unix() stopTime = time.Now().Unix()
@ -322,11 +325,11 @@ func (r *JobRepository) FindConcurrentJobs(
queryRunning := query.Where("job.job_state = ?").Where("(job.start_time BETWEEN ? AND ? OR job.start_time < ?)", queryRunning := query.Where("job.job_state = ?").Where("(job.start_time BETWEEN ? AND ? OR job.start_time < ?)",
"running", startTimeTail, stopTimeTail, startTime) "running", startTimeTail, stopTimeTail, startTime)
queryRunning = queryRunning.Where("job.resources LIKE ?", fmt.Sprint("%", job.Resources[0].Hostname, "%")) queryRunning = queryRunning.Where("job.resources LIKE ?", fmt.Sprint("%", hostname, "%"))
query = query.Where("job.job_state != ?").Where("((job.start_time BETWEEN ? AND ?) OR (job.start_time + job.duration) BETWEEN ? AND ? OR (job.start_time < ?) AND (job.start_time + job.duration) > ?)", query = query.Where("job.job_state != ?").Where("((job.start_time BETWEEN ? AND ?) OR (job.start_time + job.duration) BETWEEN ? AND ? OR (job.start_time < ?) AND (job.start_time + job.duration) > ?)",
"running", startTimeTail, stopTimeTail, startTimeFront, stopTimeTail, startTime, stopTime) "running", startTimeTail, stopTimeTail, startTimeFront, stopTimeTail, startTime, stopTime)
query = query.Where("job.resources LIKE ?", fmt.Sprint("%", job.Resources[0].Hostname, "%")) query = query.Where("job.resources LIKE ?", fmt.Sprint("%", hostname, "%"))
rows, err := query.RunWith(r.stmtCache).Query() rows, err := query.RunWith(r.stmtCache).Query()
if err != nil { if err != nil {
@ -335,16 +338,21 @@ func (r *JobRepository) FindConcurrentJobs(
} }
items := make([]*model.JobLink, 0, 10) items := make([]*model.JobLink, 0, 10)
minStart := int64(math.MaxInt64)
maxStart := int64(0)
for rows.Next() { for rows.Next() {
var id, jobId sql.NullInt64 var id, jobId, startTime sql.NullInt64
if err = rows.Scan(&id, &jobId); err != nil { if err = rows.Scan(&id, &jobId, &startTime); err != nil {
log.Warn("Error while scanning rows") log.Warn("Error while scanning rows")
return nil, err return nil, err
} }
if id.Valid { if id.Valid {
minStart = util.Min(minStart, startTime.Int64)
maxStart = util.Max(maxStart, startTime.Int64)
items = append(items, items = append(items,
&model.JobLink{ &model.JobLink{
ID: fmt.Sprint(id.Int64), ID: fmt.Sprint(id.Int64),
@ -360,14 +368,17 @@ func (r *JobRepository) FindConcurrentJobs(
} }
for rows.Next() { for rows.Next() {
var id, jobId sql.NullInt64 var id, jobId, startTime sql.NullInt64
if err := rows.Scan(&id, &jobId); err != nil { if err := rows.Scan(&id, &jobId, &startTime); err != nil {
log.Warn("Error while scanning rows") log.Warn("Error while scanning rows")
return nil, err return nil, err
} }
if id.Valid { if id.Valid {
minStart = util.Min(minStart, startTime.Int64)
maxStart = util.Max(maxStart, startTime.Int64)
items = append(items, items = append(items,
&model.JobLink{ &model.JobLink{
ID: fmt.Sprint(id.Int64), ID: fmt.Sprint(id.Int64),
@ -377,8 +388,11 @@ func (r *JobRepository) FindConcurrentJobs(
} }
cnt := len(items) cnt := len(items)
queryString := fmt.Sprintf("cluster=%s&startTime=%d-%d&node=%s",
job.Cluster, minStart, maxStart, hostname)
return &model.JobLinkResultList{ return &model.JobLinkResultList{
ListQuery: &queryString,
Items: items, Items: items,
Count: &cnt, Count: &cnt,
}, nil }, nil

View File

@ -208,6 +208,9 @@ func BuildWhereClause(filter *model.JobFilter, query sq.SelectBuilder) sq.Select
if filter.NumHWThreads != nil { if filter.NumHWThreads != nil {
query = buildIntCondition("job.num_hwthreads", filter.NumHWThreads, query) query = buildIntCondition("job.num_hwthreads", filter.NumHWThreads, query)
} }
if filter.Node != nil {
query = buildStringCondition("job.resources", filter.Node, query)
}
if filter.FlopsAnyAvg != nil { if filter.FlopsAnyAvg != nil {
query = buildFloatCondition("job.flops_any_avg", filter.FlopsAnyAvg, query) query = buildFloatCondition("job.flops_any_avg", filter.FlopsAnyAvg, query)
} }