Add subcluster and walltime to Job types

This commit is contained in:
Lou Knauer 2022-03-14 09:08:02 +01:00
parent 82720c7580
commit 2651b96499
6 changed files with 131 additions and 9 deletions

View File

@ -98,8 +98,10 @@ type ComplexityRoot struct {
SMT func(childComplexity int) int SMT func(childComplexity int) int
StartTime func(childComplexity int) int StartTime func(childComplexity int) int
State func(childComplexity int) int State func(childComplexity int) int
SubCluster func(childComplexity int) int
Tags func(childComplexity int) int Tags func(childComplexity int) int
User func(childComplexity int) int User func(childComplexity int) int
Walltime func(childComplexity int) int
} }
JobMetric struct { JobMetric struct {
@ -503,6 +505,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.Job.State(childComplexity), true return e.complexity.Job.State(childComplexity), true
case "Job.subCluster":
if e.complexity.Job.SubCluster == nil {
break
}
return e.complexity.Job.SubCluster(childComplexity), true
case "Job.tags": case "Job.tags":
if e.complexity.Job.Tags == nil { if e.complexity.Job.Tags == nil {
break break
@ -517,6 +526,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.Job.User(childComplexity), true return e.complexity.Job.User(childComplexity), true
case "Job.walltime":
if e.complexity.Job.Walltime == nil {
break
}
return e.complexity.Job.Walltime(childComplexity), true
case "JobMetric.scope": case "JobMetric.scope":
if e.complexity.JobMetric.Scope == nil { if e.complexity.JobMetric.Scope == nil {
break break
@ -1212,8 +1228,10 @@ type Job {
user: String! user: String!
project: String! project: String!
cluster: String! cluster: String!
subCluster: String!
startTime: Time! startTime: Time!
duration: Int! duration: Int!
walltime: Int!
numNodes: Int! numNodes: Int!
numHWThreads: Int! numHWThreads: Int!
numAcc: Int! numAcc: Int!
@ -2648,6 +2666,41 @@ func (ec *executionContext) _Job_cluster(ctx context.Context, field graphql.Coll
return ec.marshalNString2string(ctx, field.Selections, res) return ec.marshalNString2string(ctx, field.Selections, res)
} }
func (ec *executionContext) _Job_subCluster(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) {
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
fc := &graphql.FieldContext{
Object: "Job",
Field: field,
Args: nil,
IsMethod: false,
IsResolver: false,
}
ctx = graphql.WithFieldContext(ctx, fc)
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.SubCluster, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(string)
fc.Result = res
return ec.marshalNString2string(ctx, field.Selections, res)
}
func (ec *executionContext) _Job_startTime(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) { func (ec *executionContext) _Job_startTime(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
@ -2718,6 +2771,41 @@ func (ec *executionContext) _Job_duration(ctx context.Context, field graphql.Col
return ec.marshalNInt2int32(ctx, field.Selections, res) return ec.marshalNInt2int32(ctx, field.Selections, res)
} }
func (ec *executionContext) _Job_walltime(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) {
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
fc := &graphql.FieldContext{
Object: "Job",
Field: field,
Args: nil,
IsMethod: false,
IsResolver: false,
}
ctx = graphql.WithFieldContext(ctx, fc)
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.Walltime, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(int64)
fc.Result = res
return ec.marshalNInt2int64(ctx, field.Selections, res)
}
func (ec *executionContext) _Job_numNodes(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) { func (ec *executionContext) _Job_numNodes(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
@ -7695,6 +7783,11 @@ func (ec *executionContext) _Job(ctx context.Context, sel ast.SelectionSet, obj
if out.Values[i] == graphql.Null { if out.Values[i] == graphql.Null {
atomic.AddUint32(&invalids, 1) atomic.AddUint32(&invalids, 1)
} }
case "subCluster":
out.Values[i] = ec._Job_subCluster(ctx, field, obj)
if out.Values[i] == graphql.Null {
atomic.AddUint32(&invalids, 1)
}
case "startTime": case "startTime":
out.Values[i] = ec._Job_startTime(ctx, field, obj) out.Values[i] = ec._Job_startTime(ctx, field, obj)
if out.Values[i] == graphql.Null { if out.Values[i] == graphql.Null {
@ -7705,6 +7798,11 @@ func (ec *executionContext) _Job(ctx context.Context, sel ast.SelectionSet, obj
if out.Values[i] == graphql.Null { if out.Values[i] == graphql.Null {
atomic.AddUint32(&invalids, 1) atomic.AddUint32(&invalids, 1)
} }
case "walltime":
out.Values[i] = ec._Job_walltime(ctx, field, obj)
if out.Values[i] == graphql.Null {
atomic.AddUint32(&invalids, 1)
}
case "numNodes": case "numNodes":
out.Values[i] = ec._Job_numNodes(ctx, field, obj) out.Values[i] = ec._Job_numNodes(ctx, field, obj)
if out.Values[i] == graphql.Null { if out.Values[i] == graphql.Null {

View File

@ -11,8 +11,10 @@ type Job {
user: String! user: String!
project: String! project: String!
cluster: String! cluster: String!
subCluster: String!
startTime: Time! startTime: Time!
duration: Int! duration: Int!
walltime: Int!
numNodes: Int! numNodes: Int!
numHWThreads: Int! numHWThreads: Int!
numAcc: Int! numAcc: Int!

View File

@ -25,6 +25,7 @@ const JOBS_DB_SCHEMA string = `
id INTEGER PRIMARY KEY /*!40101 AUTO_INCREMENT */, id INTEGER PRIMARY KEY /*!40101 AUTO_INCREMENT */,
job_id BIGINT NOT NULL, job_id BIGINT NOT NULL,
cluster VARCHAR(255) NOT NULL, cluster VARCHAR(255) NOT NULL,
subcluster VARCHAR(255) NOT NULL,
start_time BIGINT NOT NULL, -- Unix timestamp start_time BIGINT NOT NULL, -- Unix timestamp
user VARCHAR(255) NOT NULL, user VARCHAR(255) NOT NULL,
@ -32,6 +33,7 @@ const JOBS_DB_SCHEMA string = `
` + "`partition`" + ` VARCHAR(255) NOT NULL, -- partition is a keyword in mysql -.- ` + "`partition`" + ` VARCHAR(255) NOT NULL, -- partition is a keyword in mysql -.-
array_job_id BIGINT NOT NULL, array_job_id BIGINT NOT NULL,
duration INT, duration INT,
walltime INT,
job_state VARCHAR(255) NOT NULL CHECK(job_state IN ('running', 'completed', 'failed', 'cancelled', 'stopped', 'timeout', 'preempted', 'out_of_memory')), job_state VARCHAR(255) NOT NULL CHECK(job_state IN ('running', 'completed', 'failed', 'cancelled', 'stopped', 'timeout', 'preempted', 'out_of_memory')),
meta_data TEXT, -- JSON meta_data TEXT, -- JSON
resources TEXT NOT NULL, -- JSON resources TEXT NOT NULL, -- JSON

View File

@ -16,12 +16,12 @@ import (
) )
const NamedJobInsert string = `INSERT INTO job ( const NamedJobInsert string = `INSERT INTO job (
job_id, user, project, cluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc, job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, resources, meta_data, exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data,
mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total
) VALUES ( ) VALUES (
:job_id, :user, :project, :cluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, :job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :resources, :meta_data, :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data,
:mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total :mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total
);` );`

View File

@ -13,6 +13,7 @@ import (
"github.com/ClusterCockpit/cc-backend/graph/model" "github.com/ClusterCockpit/cc-backend/graph/model"
"github.com/ClusterCockpit/cc-backend/schema" "github.com/ClusterCockpit/cc-backend/schema"
sq "github.com/Masterminds/squirrel" sq "github.com/Masterminds/squirrel"
"github.com/iamlouk/lrucache"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
) )
@ -20,10 +21,12 @@ type JobRepository struct {
DB *sqlx.DB DB *sqlx.DB
stmtCache *sq.StmtCache stmtCache *sq.StmtCache
cache *lrucache.Cache
} }
func (r *JobRepository) Init() error { func (r *JobRepository) Init() error {
r.stmtCache = sq.NewStmtCache(r.DB) r.stmtCache = sq.NewStmtCache(r.DB)
r.cache = lrucache.New(100)
return nil return nil
} }
@ -120,11 +123,11 @@ func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) {
} }
res, err := r.DB.NamedExec(`INSERT INTO job ( res, err := r.DB.NamedExec(`INSERT INTO job (
job_id, user, project, cluster, `+"`partition`"+`, array_job_id, num_nodes, num_hwthreads, num_acc, job_id, user, project, cluster, subcluster, `+"`partition`"+`, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, resources, meta_data exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data
) VALUES ( ) VALUES (
:job_id, :user, :project, :cluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, :job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc,
:exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :resources, :meta_data :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data
);`, job) );`, job)
if err != nil { if err != nil {
return -1, err return -1, err
@ -260,3 +263,19 @@ func (r *JobRepository) FindJobOrUser(ctx context.Context, searchterm string) (j
return 0, "", ErrNotFound return 0, "", ErrNotFound
} }
func (r *JobRepository) Partitions(cluster string) ([]string, error) {
var err error
partitions := r.cache.Get("partitions:"+cluster, func() (interface{}, time.Duration, int) {
parts := []string{}
if err = r.DB.Select(&parts, `SELECT DISTINCT job.partition FROM job WHERE job.cluster = ?;`, cluster); err != nil {
return nil, 0, 1000
}
return parts, 1 * time.Hour, 1
})
if err != nil {
return nil, err
}
return partitions.([]string), nil
}

View File

@ -14,6 +14,7 @@ type BaseJob struct {
User string `json:"user" db:"user"` User string `json:"user" db:"user"`
Project string `json:"project" db:"project"` Project string `json:"project" db:"project"`
Cluster string `json:"cluster" db:"cluster"` Cluster string `json:"cluster" db:"cluster"`
SubCluster string `json:"subCluster" db:"subcluster"`
Partition string `json:"partition" db:"partition"` Partition string `json:"partition" db:"partition"`
ArrayJobId int32 `json:"arrayJobId" db:"array_job_id"` ArrayJobId int32 `json:"arrayJobId" db:"array_job_id"`
NumNodes int32 `json:"numNodes" db:"num_nodes"` NumNodes int32 `json:"numNodes" db:"num_nodes"`
@ -24,6 +25,7 @@ type BaseJob struct {
SMT int32 `json:"smt" db:"smt"` SMT int32 `json:"smt" db:"smt"`
State JobState `json:"jobState" db:"job_state"` State JobState `json:"jobState" db:"job_state"`
Duration int32 `json:"duration" db:"duration"` Duration int32 `json:"duration" db:"duration"`
Walltime int64 `json:"walltime" db:"walltime"`
Tags []*Tag `json:"tags"` Tags []*Tag `json:"tags"`
RawResources []byte `json:"-" db:"resources"` RawResources []byte `json:"-" db:"resources"`
Resources []*Resource `json:"resources"` Resources []*Resource `json:"resources"`
@ -54,7 +56,6 @@ type Job struct {
type JobMeta struct { type JobMeta struct {
ID *int64 `json:"id,omitempty"` // never used in the job-archive, only available via REST-API ID *int64 `json:"id,omitempty"` // never used in the job-archive, only available via REST-API
BaseJob BaseJob
Walltime int64 `json:"walltime"` // TODO: Missing in DB
StartTime int64 `json:"startTime" db:"start_time"` StartTime int64 `json:"startTime" db:"start_time"`
Statistics map[string]JobStatistics `json:"statistics,omitempty"` Statistics map[string]JobStatistics `json:"statistics,omitempty"`
} }