Refactor and adapt to new API

This commit is contained in:
Jan Eitzinger 2023-06-09 09:09:41 +02:00
parent 02752f52e4
commit 5ba6f0ed3a
9 changed files with 239 additions and 278 deletions

View File

@ -286,10 +286,13 @@ type HistoPoint {
type JobsStatistics { type JobsStatistics {
id: ID! # If `groupBy` was used, ID of the user/project/cluster id: ID! # If `groupBy` was used, ID of the user/project/cluster
name: String! # if User-Statistics: Given Name of Account (ID) Owner name: String! # if User-Statistics: Given Name of Account (ID) Owner
totalJobs: Int! # Number of jobs that matched totalJobs: Int! # Number of jobs
shortJobs: Int! # Number of jobs with a duration of less than 2 minutes runningJobs: Int! # Number of running jobs
shortJobs: Int! # Number of jobs with a duration of less than duration
totalWalltime: Int! # Sum of the duration of all matched jobs in hours totalWalltime: Int! # Sum of the duration of all matched jobs in hours
totalNodeHours: Int! # Sum of the node hours of all matched jobs
totalCoreHours: Int! # Sum of the core hours of all matched jobs totalCoreHours: Int! # Sum of the core hours of all matched jobs
totalAccHours: Int! # Sum of the gpu hours of all matched jobs
histDuration: [HistoPoint!]! # value: hour, count: number of jobs with a rounded duration of value histDuration: [HistoPoint!]! # value: hour, count: number of jobs with a rounded duration of value
histNumNodes: [HistoPoint!]! # value: number of nodes, count: number of jobs with that number of nodes histNumNodes: [HistoPoint!]! # value: number of nodes, count: number of jobs with that number of nodes
} }

View File

@ -143,9 +143,11 @@ type ComplexityRoot struct {
HistNumNodes func(childComplexity int) int HistNumNodes func(childComplexity int) int
ID func(childComplexity int) int ID func(childComplexity int) int
Name func(childComplexity int) int Name func(childComplexity int) int
RunningJobs func(childComplexity int) int
ShortJobs func(childComplexity int) int ShortJobs func(childComplexity int) int
TotalCoreHours func(childComplexity int) int TotalCoreHours func(childComplexity int) int
TotalJobs func(childComplexity int) int TotalJobs func(childComplexity int) int
TotalNodeHours func(childComplexity int) int
TotalWalltime func(childComplexity int) int TotalWalltime func(childComplexity int) int
} }
@ -731,6 +733,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.JobsStatistics.Name(childComplexity), true return e.complexity.JobsStatistics.Name(childComplexity), true
case "JobsStatistics.runningJobs":
if e.complexity.JobsStatistics.RunningJobs == nil {
break
}
return e.complexity.JobsStatistics.RunningJobs(childComplexity), true
case "JobsStatistics.shortJobs": case "JobsStatistics.shortJobs":
if e.complexity.JobsStatistics.ShortJobs == nil { if e.complexity.JobsStatistics.ShortJobs == nil {
break break
@ -752,6 +761,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.JobsStatistics.TotalJobs(childComplexity), true return e.complexity.JobsStatistics.TotalJobs(childComplexity), true
case "JobsStatistics.totalNodeHours":
if e.complexity.JobsStatistics.TotalNodeHours == nil {
break
}
return e.complexity.JobsStatistics.TotalNodeHours(childComplexity), true
case "JobsStatistics.totalWalltime": case "JobsStatistics.totalWalltime":
if e.complexity.JobsStatistics.TotalWalltime == nil { if e.complexity.JobsStatistics.TotalWalltime == nil {
break break
@ -1764,9 +1780,11 @@ type HistoPoint {
type JobsStatistics { type JobsStatistics {
id: ID! # If ` + "`" + `groupBy` + "`" + ` was used, ID of the user/project/cluster id: ID! # If ` + "`" + `groupBy` + "`" + ` was used, ID of the user/project/cluster
name: String! # if User-Statistics: Given Name of Account (ID) Owner name: String! # if User-Statistics: Given Name of Account (ID) Owner
totalJobs: Int! # Number of jobs that matched totalJobs: Int! # Number of jobs
shortJobs: Int! # Number of jobs with a duration of less than 2 minutes runningJobs: Int! # Number of running jobs
shortJobs: Int! # Number of jobs with a duration of less than duration
totalWalltime: Int! # Sum of the duration of all matched jobs in hours totalWalltime: Int! # Sum of the duration of all matched jobs in hours
totalNodeHours: Int! # Sum of the node hours of all matched jobs
totalCoreHours: Int! # Sum of the core hours of all matched jobs totalCoreHours: Int! # Sum of the core hours of all matched jobs
histDuration: [HistoPoint!]! # value: hour, count: number of jobs with a rounded duration of value histDuration: [HistoPoint!]! # value: hour, count: number of jobs with a rounded duration of value
histNumNodes: [HistoPoint!]! # value: number of nodes, count: number of jobs with that number of nodes histNumNodes: [HistoPoint!]! # value: number of nodes, count: number of jobs with that number of nodes
@ -4884,6 +4902,50 @@ func (ec *executionContext) fieldContext_JobsStatistics_totalJobs(ctx context.Co
return fc, nil return fc, nil
} }
func (ec *executionContext) _JobsStatistics_runningJobs(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_JobsStatistics_runningJobs(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.RunningJobs, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(int)
fc.Result = res
return ec.marshalNInt2int(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_JobsStatistics_runningJobs(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "JobsStatistics",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Int does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _JobsStatistics_shortJobs(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) { func (ec *executionContext) _JobsStatistics_shortJobs(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_JobsStatistics_shortJobs(ctx, field) fc, err := ec.fieldContext_JobsStatistics_shortJobs(ctx, field)
if err != nil { if err != nil {
@ -4972,6 +5034,50 @@ func (ec *executionContext) fieldContext_JobsStatistics_totalWalltime(ctx contex
return fc, nil return fc, nil
} }
func (ec *executionContext) _JobsStatistics_totalNodeHours(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_JobsStatistics_totalNodeHours(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.TotalNodeHours, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(int)
fc.Result = res
return ec.marshalNInt2int(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_JobsStatistics_totalNodeHours(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "JobsStatistics",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Int does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _JobsStatistics_totalCoreHours(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) { func (ec *executionContext) _JobsStatistics_totalCoreHours(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_JobsStatistics_totalCoreHours(ctx, field) fc, err := ec.fieldContext_JobsStatistics_totalCoreHours(ctx, field)
if err != nil { if err != nil {
@ -6867,10 +6973,14 @@ func (ec *executionContext) fieldContext_Query_jobsStatistics(ctx context.Contex
return ec.fieldContext_JobsStatistics_name(ctx, field) return ec.fieldContext_JobsStatistics_name(ctx, field)
case "totalJobs": case "totalJobs":
return ec.fieldContext_JobsStatistics_totalJobs(ctx, field) return ec.fieldContext_JobsStatistics_totalJobs(ctx, field)
case "runningJobs":
return ec.fieldContext_JobsStatistics_runningJobs(ctx, field)
case "shortJobs": case "shortJobs":
return ec.fieldContext_JobsStatistics_shortJobs(ctx, field) return ec.fieldContext_JobsStatistics_shortJobs(ctx, field)
case "totalWalltime": case "totalWalltime":
return ec.fieldContext_JobsStatistics_totalWalltime(ctx, field) return ec.fieldContext_JobsStatistics_totalWalltime(ctx, field)
case "totalNodeHours":
return ec.fieldContext_JobsStatistics_totalNodeHours(ctx, field)
case "totalCoreHours": case "totalCoreHours":
return ec.fieldContext_JobsStatistics_totalCoreHours(ctx, field) return ec.fieldContext_JobsStatistics_totalCoreHours(ctx, field)
case "histDuration": case "histDuration":
@ -12062,6 +12172,13 @@ func (ec *executionContext) _JobsStatistics(ctx context.Context, sel ast.Selecti
out.Values[i] = ec._JobsStatistics_totalJobs(ctx, field, obj) out.Values[i] = ec._JobsStatistics_totalJobs(ctx, field, obj)
if out.Values[i] == graphql.Null {
invalids++
}
case "runningJobs":
out.Values[i] = ec._JobsStatistics_runningJobs(ctx, field, obj)
if out.Values[i] == graphql.Null { if out.Values[i] == graphql.Null {
invalids++ invalids++
} }
@ -12076,6 +12193,13 @@ func (ec *executionContext) _JobsStatistics(ctx context.Context, sel ast.Selecti
out.Values[i] = ec._JobsStatistics_totalWalltime(ctx, field, obj) out.Values[i] = ec._JobsStatistics_totalWalltime(ctx, field, obj)
if out.Values[i] == graphql.Null {
invalids++
}
case "totalNodeHours":
out.Values[i] = ec._JobsStatistics_totalNodeHours(ctx, field, obj)
if out.Values[i] == graphql.Null { if out.Values[i] == graphql.Null {
invalids++ invalids++
} }

View File

@ -90,8 +90,10 @@ type JobsStatistics struct {
ID string `json:"id"` ID string `json:"id"`
Name string `json:"name"` Name string `json:"name"`
TotalJobs int `json:"totalJobs"` TotalJobs int `json:"totalJobs"`
RunningJobs int `json:"runningJobs"`
ShortJobs int `json:"shortJobs"` ShortJobs int `json:"shortJobs"`
TotalWalltime int `json:"totalWalltime"` TotalWalltime int `json:"totalWalltime"`
TotalNodeHours int `json:"totalNodeHours"`
TotalCoreHours int `json:"totalCoreHours"` TotalCoreHours int `json:"totalCoreHours"`
HistDuration []*HistoPoint `json:"histDuration"` HistDuration []*HistoPoint `json:"histDuration"`
HistNumNodes []*HistoPoint `json:"histNumNodes"` HistNumNodes []*HistoPoint `json:"histNumNodes"`

View File

@ -11,7 +11,6 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/99designs/gqlgen/graphql"
"github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/graph/generated" "github.com/ClusterCockpit/cc-backend/internal/graph/generated"
"github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/graph/model"
@ -234,20 +233,12 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str
} }
// JobsFootprints is the resolver for the jobsFootprints field. // JobsFootprints is the resolver for the jobsFootprints field.
func (r *queryResolver) JobsFootprints( func (r *queryResolver) JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error) {
ctx context.Context,
filter []*model.JobFilter,
metrics []string) (*model.Footprints, error) {
return r.jobsFootprints(ctx, filter, metrics) return r.jobsFootprints(ctx, filter, metrics)
} }
// Jobs is the resolver for the jobs field. // Jobs is the resolver for the jobs field.
func (r *queryResolver) Jobs( func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) (*model.JobResultList, error) {
ctx context.Context,
filter []*model.JobFilter,
page *model.PageRequest,
order *model.OrderByInput) (*model.JobResultList, error) {
if page == nil { if page == nil {
page = &model.PageRequest{ page = &model.PageRequest{
ItemsPerPage: 50, ItemsPerPage: 50,
@ -276,23 +267,26 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF
var stats []*model.JobsStatistics var stats []*model.JobsStatistics
if requireField(ctx, "totalJobs") { if requireField(ctx, "totalJobs") {
if requireField(ctx, "totalCoreHours") {
if groupBy == nil { if groupBy == nil {
stats, err = r.Repo.JobsStatsPlain(ctx, filter) stats, err = r.Repo.JobsStats(ctx, filter)
} else { } else {
stats, err = r.Repo.JobsStats(ctx, filter, groupBy) stats, err = r.Repo.JobsStatsGrouped(ctx, filter, groupBy)
}
} else {
if groupBy == nil {
stats, err = r.Repo.JobsStatsPlainNoCoreH(ctx, filter)
} else {
stats, err = r.Repo.JobsStatsNoCoreH(ctx, filter, groupBy)
}
} }
} else { } else {
stats = make([]*model.JobsStatistics, 0, 1) stats = make([]*model.JobsStatistics, 0, 1)
} }
if groupBy != nil {
if requireField(ctx, "shortJobs") {
stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "short")
}
if requireField(ctx, "RunningJobs") {
stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "running")
}
} else {
return nil, errors.New("Job counts only implemented with groupBy argument")
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -312,13 +306,7 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF
} }
// JobsCount is the resolver for the jobsCount field. // JobsCount is the resolver for the jobsCount field.
func (r *queryResolver) JobsCount( func (r *queryResolver) JobsCount(ctx context.Context, filter []*model.JobFilter, groupBy model.Aggregate, weight *model.Weights, limit *int) ([]*model.Count, error) {
ctx context.Context,
filter []*model.JobFilter,
groupBy model.Aggregate,
weight *model.Weights,
limit *int) ([]*model.Count, error) {
counts, err := r.Repo.CountGroupedJobs(ctx, groupBy, filter, weight, limit) counts, err := r.Repo.CountGroupedJobs(ctx, groupBy, filter, weight, limit)
if err != nil { if err != nil {
log.Warn("Error while counting grouped jobs") log.Warn("Error while counting grouped jobs")
@ -412,21 +400,3 @@ type jobResolver struct{ *Resolver }
type mutationResolver struct{ *Resolver } type mutationResolver struct{ *Resolver }
type queryResolver struct{ *Resolver } type queryResolver struct{ *Resolver }
type subClusterResolver struct{ *Resolver } type subClusterResolver struct{ *Resolver }
// !!! WARNING !!!
// The code below was going to be deleted when updating resolvers. It has been copied here so you have
// one last chance to move it out of harms way if you want. There are two reasons this happens:
// - When renaming or deleting a resolver the old code will be put in here. You can safely delete
// it when you're done.
// - You have helper methods in this file. Move them out to keep these resolver files clean.
func requireField(ctx context.Context, name string) bool {
fields := graphql.CollectAllFields(ctx)
for _, f := range fields {
if f == name {
return true
}
}
return false
}

View File

@ -10,6 +10,7 @@ import (
"fmt" "fmt"
"math" "math"
"github.com/99designs/gqlgen/graphql"
"github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
@ -132,3 +133,15 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF
Metrics: res, Metrics: res,
}, nil }, nil
} }
func requireField(ctx context.Context, name string) bool {
fields := graphql.CollectAllFields(ctx)
for _, f := range fields {
if f == name {
return true
}
}
return false
}

View File

@ -13,7 +13,6 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
sq "github.com/Masterminds/squirrel" sq "github.com/Masterminds/squirrel"
) )
@ -62,14 +61,18 @@ func (r *JobRepository) buildStatsQuery(
castType := r.getCastType() castType := r.getCastType()
if col != "" { if col != "" {
// Scan columns: id, totalJobs, totalWalltime // Scan columns: id, totalJobs, totalWalltime, totalNodeHours, totalCoreHours
query = sq.Select(col, "COUNT(job.id)", query = sq.Select(col, "COUNT(job.id)",
fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType), fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s)", castType),
).From("job").GroupBy(col) ).From("job").GroupBy(col)
} else { } else {
// Scan columns: totalJobs, totalWalltime // Scan columns: totalJobs, totalWalltime, totalNodeHours, totalCoreHours
query = sq.Select("COUNT(job.id)", query = sq.Select("COUNT(job.id)",
fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType), fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s)", castType),
).From("job") ).From("job")
} }
@ -105,8 +108,7 @@ func (r *JobRepository) getCastType() string {
return castType return castType
} }
// with groupBy and without coreHours func (r *JobRepository) JobsStatsGrouped(
func (r *JobRepository) JobsStatsNoCoreH(
ctx context.Context, ctx context.Context,
filter []*model.JobFilter, filter []*model.JobFilter,
groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
@ -129,8 +131,8 @@ func (r *JobRepository) JobsStatsNoCoreH(
for rows.Next() { for rows.Next() {
var id sql.NullString var id sql.NullString
var jobs, walltime sql.NullInt64 var jobs, walltime, nodeHours, coreHours sql.NullInt64
if err := rows.Scan(&id, &jobs, &walltime); err != nil { if err := rows.Scan(&id, &jobs, &walltime, &nodeHours, &coreHours); err != nil {
log.Warn("Error while scanning rows") log.Warn("Error while scanning rows")
return nil, err return nil, err
} }
@ -141,7 +143,7 @@ func (r *JobRepository) JobsStatsNoCoreH(
stats = append(stats, stats = append(stats,
&model.JobsStatistics{ &model.JobsStatistics{
ID: id.String, ID: id.String,
Name: &name, Name: name,
TotalJobs: int(jobs.Int64), TotalJobs: int(jobs.Int64),
TotalWalltime: int(walltime.Int64)}) TotalWalltime: int(walltime.Int64)})
} else { } else {
@ -158,8 +160,7 @@ func (r *JobRepository) JobsStatsNoCoreH(
return stats, nil return stats, nil
} }
// without groupBy and without coreHours func (r *JobRepository) JobsStats(
func (r *JobRepository) JobsStatsPlainNoCoreH(
ctx context.Context, ctx context.Context,
filter []*model.JobFilter) ([]*model.JobsStatistics, error) { filter []*model.JobFilter) ([]*model.JobsStatistics, error) {
@ -172,175 +173,32 @@ func (r *JobRepository) JobsStatsPlainNoCoreH(
row := query.RunWith(r.DB).QueryRow() row := query.RunWith(r.DB).QueryRow()
stats := make([]*model.JobsStatistics, 0, 1) stats := make([]*model.JobsStatistics, 0, 1)
var jobs, walltime sql.NullInt64
if err := row.Scan(&jobs, &walltime); err != nil { var jobs, walltime, nodeHours, coreHours sql.NullInt64
if err := row.Scan(&jobs, &walltime, &nodeHours, &coreHours); err != nil {
log.Warn("Error while scanning rows") log.Warn("Error while scanning rows")
return nil, err return nil, err
} }
if jobs.Valid { if jobs.Valid {
query := r.buildCountQuery(filter, "short", "")
query, err := SecurityCheck(ctx, query)
if err != nil {
return nil, err
}
var cnt sql.NullInt64
if err := query.RunWith(r.DB).QueryRow().Scan(&cnt); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
stats = append(stats, stats = append(stats,
&model.JobsStatistics{ &model.JobsStatistics{
TotalJobs: int(jobs.Int64), TotalJobs: int(jobs.Int64),
TotalWalltime: int(walltime.Int64), TotalWalltime: int(walltime.Int64)})
ShortJobs: int(cnt.Int64)})
} }
log.Infof("Timer JobStatistics %s", time.Since(start)) log.Infof("Timer JobStatistics %s", time.Since(start))
return stats, nil return stats, nil
} }
// without groupBy and with coreHours func (r *JobRepository) JobCountGrouped(
func (r *JobRepository) JobsStatsPlain(
ctx context.Context,
filter []*model.JobFilter) ([]*model.JobsStatistics, error) {
start := time.Now()
query := r.buildStatsQuery(filter, "")
query, err := SecurityCheck(ctx, query)
if err != nil {
return nil, err
}
castType := r.getCastType()
var totalJobs, totalWalltime, totalCoreHours int64
for _, cluster := range archive.Clusters {
for _, subcluster := range cluster.SubClusters {
scQuery := query.Column(fmt.Sprintf(
"CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as %s)",
subcluster.SocketsPerNode, subcluster.CoresPerSocket, castType))
scQuery = scQuery.Where("job.cluster = ?", cluster.Name).
Where("job.subcluster = ?", subcluster.Name)
row := scQuery.RunWith(r.DB).QueryRow()
var jobs, walltime, corehours sql.NullInt64
if err := row.Scan(&jobs, &walltime, &corehours); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if jobs.Valid {
totalJobs += jobs.Int64
totalWalltime += walltime.Int64
totalCoreHours += corehours.Int64
}
}
}
stats := make([]*model.JobsStatistics, 0, 1)
stats = append(stats,
&model.JobsStatistics{
TotalJobs: int(totalJobs),
TotalWalltime: int(totalWalltime),
TotalCoreHours: int(totalCoreHours)})
log.Infof("Timer JobStatistics %s", time.Since(start))
return stats, nil
}
// with groupBy and with coreHours
func (r *JobRepository) JobsStats(
ctx context.Context, ctx context.Context,
filter []*model.JobFilter, filter []*model.JobFilter,
groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
start := time.Now() start := time.Now()
stats := map[string]*model.JobsStatistics{}
col := groupBy2column[*groupBy] col := groupBy2column[*groupBy]
query := r.buildStatsQuery(filter, col) query := r.buildCountQuery(filter, "", col)
query, err := SecurityCheck(ctx, query)
if err != nil {
return nil, err
}
castType := r.getCastType()
for _, cluster := range archive.Clusters {
for _, subcluster := range cluster.SubClusters {
scQuery := query.Column(fmt.Sprintf(
"CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as %s)",
subcluster.SocketsPerNode, subcluster.CoresPerSocket, castType))
scQuery = scQuery.Where("job.cluster = ?", cluster.Name).
Where("job.subcluster = ?", subcluster.Name)
rows, err := scQuery.RunWith(r.DB).Query()
if err != nil {
log.Warn("Error while querying DB for job statistics")
return nil, err
}
for rows.Next() {
var id sql.NullString
var jobs, walltime, corehours sql.NullInt64
if err := rows.Scan(&id, &jobs, &walltime, &corehours); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if s, ok := stats[id.String]; ok {
s.TotalJobs += int(jobs.Int64)
s.TotalWalltime += int(walltime.Int64)
s.TotalCoreHours += int(corehours.Int64)
} else {
if col == "job.user" {
name := r.getUserName(ctx, id.String)
stats[id.String] = &model.JobsStatistics{
ID: id.String,
Name: &name,
TotalJobs: int(jobs.Int64),
TotalWalltime: int(walltime.Int64),
TotalCoreHours: int(corehours.Int64),
}
} else {
stats[id.String] = &model.JobsStatistics{
ID: id.String,
TotalJobs: int(jobs.Int64),
TotalWalltime: int(walltime.Int64),
TotalCoreHours: int(corehours.Int64),
}
}
}
}
}
}
res := make([]*model.JobsStatistics, 0, len(stats))
for _, stat := range stats {
res = append(res, stat)
}
log.Infof("Timer JobStatistics %s", time.Since(start))
return res, nil
}
type jobCountResult struct {
id string
shortJobs int
totalJobs int
runningJobs int
}
func (r *JobRepository) JobCounts(
ctx context.Context,
filter []*model.JobFilter) ([]*model.JobsStatistics, error) {
counts := make(map[string]jobCountResult)
start := time.Now()
query := r.buildCountQuery(filter, "short", "cluster")
query, err := SecurityCheck(ctx, query) query, err := SecurityCheck(ctx, query)
if err != nil { if err != nil {
return nil, err return nil, err
@ -351,6 +209,8 @@ func (r *JobRepository) JobCounts(
return nil, err return nil, err
} }
stats := make([]*model.JobsStatistics, 0, 100)
for rows.Next() { for rows.Next() {
var id sql.NullString var id sql.NullString
var cnt sql.NullInt64 var cnt sql.NullInt64
@ -359,21 +219,39 @@ func (r *JobRepository) JobCounts(
return nil, err return nil, err
} }
if id.Valid { if id.Valid {
counts[id.String] = jobCountResult{id: id.String, shortJobs: int(cnt.Int64)} stats = append(stats,
&model.JobsStatistics{
ID: id.String,
TotalJobs: int(cnt.Int64)})
} }
} }
query = r.buildCountQuery(filter, "running", "cluster") log.Infof("Timer JobStatistics %s", time.Since(start))
query, err = SecurityCheck(ctx, query) return stats, nil
}
func (r *JobRepository) AddJobCountGrouped(
ctx context.Context,
filter []*model.JobFilter,
groupBy *model.Aggregate,
stats []*model.JobsStatistics,
kind string) ([]*model.JobsStatistics, error) {
start := time.Now()
col := groupBy2column[*groupBy]
query := r.buildCountQuery(filter, kind, col)
query, err := SecurityCheck(ctx, query)
if err != nil { if err != nil {
return nil, err return nil, err
} }
rows, err = query.RunWith(r.DB).Query() rows, err := query.RunWith(r.DB).Query()
if err != nil { if err != nil {
log.Warn("Error while querying DB for job statistics") log.Warn("Error while querying DB for job statistics")
return nil, err return nil, err
} }
counts := make(map[string]int)
for rows.Next() { for rows.Next() {
var id sql.NullString var id sql.NullString
var cnt sql.NullInt64 var cnt sql.NullInt64
@ -382,18 +260,21 @@ func (r *JobRepository) JobCounts(
return nil, err return nil, err
} }
if id.Valid { if id.Valid {
counts[id.String].runningJobs = int(cnt.Int64) counts[id.String] = int(cnt.Int64)
} }
} }
stats := make([]*model.JobsStatistics, 0, 20) switch kind {
if id.Valid { case "running":
stats = append(stats, for _, s := range stats {
&model.JobsStatistics{ s.RunningJobs = counts[s.ID]
ID: id.String,
TotalJobs: int(jobs.Int64),
RunningJobs: int(walltime.Int64)})
} }
case "short":
for _, s := range stats {
s.ShortJobs = counts[s.ID]
}
}
log.Infof("Timer JobStatistics %s", time.Since(start)) log.Infof("Timer JobStatistics %s", time.Since(start))
return stats, nil return stats, nil
} }

View File

@ -14,12 +14,9 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/api" "github.com/ClusterCockpit/cc-backend/internal/api"
"github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/ClusterCockpit/cc-backend/web" "github.com/ClusterCockpit/cc-backend/web"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
@ -50,47 +47,20 @@ var routes []Route = []Route{
} }
func setupHomeRoute(i InfoType, r *http.Request) InfoType { func setupHomeRoute(i InfoType, r *http.Request) InfoType {
type cluster struct {
Name string
RunningJobs int
TotalJobs int
RecentShortJobs int
}
jobRepo := repository.GetJobRepository() jobRepo := repository.GetJobRepository()
groupBy := model.AggregateCluster
runningJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{ stats, err := jobRepo.JobCountGrouped(r.Context(), nil, &groupBy)
State: []schema.JobState{schema.JobStateRunning},
}}, nil, nil)
if err != nil { if err != nil {
log.Warnf("failed to count jobs: %s", err.Error()) log.Warnf("failed to count jobs: %s", err.Error())
runningJobs = map[string]int{}
}
totalJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, nil, nil, nil)
if err != nil {
log.Warnf("failed to count jobs: %s", err.Error())
totalJobs = map[string]int{}
}
from := time.Now().Add(-24 * time.Hour)
recentShortJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{
StartTime: &schema.TimeRange{From: &from, To: nil},
Duration: &schema.IntRange{From: 0, To: config.Keys.ShortRunningJobsDuration},
}}, nil, nil)
if err != nil {
log.Warnf("failed to count jobs: %s", err.Error())
recentShortJobs = map[string]int{}
} }
clusters := make([]cluster, 0) stats, err = jobRepo.AddJobCountGrouped(r.Context(), nil, &groupBy, stats, "running")
for _, c := range archive.Clusters { if err != nil {
clusters = append(clusters, cluster{ log.Warnf("failed to count running jobs: %s", err.Error())
Name: c.Name,
RunningJobs: runningJobs[c.Name],
TotalJobs: totalJobs[c.Name],
RecentShortJobs: recentShortJobs[c.Name],
})
} }
i["clusters"] = clusters i["clusters"] = stats
return i return i
} }

View File

@ -25,6 +25,7 @@ type BaseJob struct {
Partition string `json:"partition,omitempty" db:"partition" example:"main"` // The Slurm partition to which the job was submitted Partition string `json:"partition,omitempty" db:"partition" example:"main"` // The Slurm partition to which the job was submitted
ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"` // The unique identifier of an array job ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"` // The unique identifier of an array job
NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0) NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0)
// NumCores int32 `json:"numCores" db:"num_cores" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0)
NumHWThreads int32 `json:"numHwthreads,omitempty" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0) NumHWThreads int32 `json:"numHwthreads,omitempty" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0)
NumAcc int32 `json:"numAcc,omitempty" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0) NumAcc int32 `json:"numAcc,omitempty" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0)
Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user

View File

@ -6,9 +6,8 @@
<thead> <thead>
<tr> <tr>
<th>Name</th> <th>Name</th>
<th>Running Jobs (short ones not listed)</th> <th>Running Jobs</th>
<th>Total Jobs</th> <th>Total Jobs</th>
<th>Short Jobs in past 24h</th>
{{if .User.HasRole .Roles.admin}} {{if .User.HasRole .Roles.admin}}
<th>System View</th> <th>System View</th>
<th>Analysis View</th> <th>Analysis View</th>
@ -19,21 +18,19 @@
{{if .User.HasRole .Roles.admin}} {{if .User.HasRole .Roles.admin}}
{{range .Infos.clusters}} {{range .Infos.clusters}}
<tr> <tr>
<td>{{.Name}}</td> <td>{{.ID}}</td>
<td><a href="/monitoring/jobs/?cluster={{.Name}}&state=running">{{.RunningJobs}} jobs</a></td> <td><a href="/monitoring/jobs/?cluster={{.ID}}&state=running">{{.RunningJobs}} jobs</a></td>
<td><a href="/monitoring/jobs/?cluster={{.Name}}">{{.TotalJobs}} jobs</a></td> <td><a href="/monitoring/jobs/?cluster={{.ID}}">{{.TotalJobs}} jobs</a></td>
<td>{{.RecentShortJobs}}</td> <td><a href="/monitoring/systems/{{.ID}}">System View</a></td>
<td><a href="/monitoring/systems/{{.Name}}">System View</a></td> <td><a href="/monitoring/analysis/{{.ID}}">Analysis View</a></td>
<td><a href="/monitoring/analysis/{{.Name}}">Analysis View</a></td>
</tr> </tr>
{{end}} {{end}}
{{else}} {{else}}
{{range .Infos.clusters}} {{range .Infos.clusters}}
<tr> <tr>
<td>{{.Name}}</td> <td>{{.ID}}</td>
<td><a href="/monitoring/jobs/?cluster={{.Name}}&state=running">{{.RunningJobs}} jobs</a></td> <td><a href="/monitoring/jobs/?cluster={{.ID}}&state=running">{{.RunningJobs}} jobs</a></td>
<td><a href="/monitoring/jobs/?cluster={{.Name}}">{{.TotalJobs}} jobs</a></td> <td><a href="/monitoring/jobs/?cluster={{.ID}}">{{.TotalJobs}} jobs</a></td>
<td>{{.RecentShortJobs}}</td>
</tr> </tr>
{{end}} {{end}}
{{end}} {{end}}