Merge pull request #124 from ClusterCockpit/refactor-job-repository

Refactor job repository
This commit is contained in:
Jan Eitzinger 2023-06-09 15:07:46 +02:00 committed by GitHub
commit f70c88d6b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 848 additions and 402 deletions

View File

@ -286,10 +286,13 @@ type HistoPoint {
type JobsStatistics { type JobsStatistics {
id: ID! # If `groupBy` was used, ID of the user/project/cluster id: ID! # If `groupBy` was used, ID of the user/project/cluster
name: String! # if User-Statistics: Given Name of Account (ID) Owner name: String! # if User-Statistics: Given Name of Account (ID) Owner
totalJobs: Int! # Number of jobs that matched totalJobs: Int! # Number of jobs
shortJobs: Int! # Number of jobs with a duration of less than 2 minutes runningJobs: Int! # Number of running jobs
shortJobs: Int! # Number of jobs with a duration of less than duration
totalWalltime: Int! # Sum of the duration of all matched jobs in hours totalWalltime: Int! # Sum of the duration of all matched jobs in hours
totalNodeHours: Int! # Sum of the node hours of all matched jobs
totalCoreHours: Int! # Sum of the core hours of all matched jobs totalCoreHours: Int! # Sum of the core hours of all matched jobs
totalAccHours: Int! # Sum of the gpu hours of all matched jobs
histDuration: [HistoPoint!]! # value: hour, count: number of jobs with a rounded duration of value histDuration: [HistoPoint!]! # value: hour, count: number of jobs with a rounded duration of value
histNumNodes: [HistoPoint!]! # value: number of nodes, count: number of jobs with that number of nodes histNumNodes: [HistoPoint!]! # value: number of nodes, count: number of jobs with that number of nodes
} }

View File

@ -143,9 +143,12 @@ type ComplexityRoot struct {
HistNumNodes func(childComplexity int) int HistNumNodes func(childComplexity int) int
ID func(childComplexity int) int ID func(childComplexity int) int
Name func(childComplexity int) int Name func(childComplexity int) int
RunningJobs func(childComplexity int) int
ShortJobs func(childComplexity int) int ShortJobs func(childComplexity int) int
TotalAccHours func(childComplexity int) int
TotalCoreHours func(childComplexity int) int TotalCoreHours func(childComplexity int) int
TotalJobs func(childComplexity int) int TotalJobs func(childComplexity int) int
TotalNodeHours func(childComplexity int) int
TotalWalltime func(childComplexity int) int TotalWalltime func(childComplexity int) int
} }
@ -731,6 +734,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.JobsStatistics.Name(childComplexity), true return e.complexity.JobsStatistics.Name(childComplexity), true
case "JobsStatistics.runningJobs":
if e.complexity.JobsStatistics.RunningJobs == nil {
break
}
return e.complexity.JobsStatistics.RunningJobs(childComplexity), true
case "JobsStatistics.shortJobs": case "JobsStatistics.shortJobs":
if e.complexity.JobsStatistics.ShortJobs == nil { if e.complexity.JobsStatistics.ShortJobs == nil {
break break
@ -738,6 +748,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.JobsStatistics.ShortJobs(childComplexity), true return e.complexity.JobsStatistics.ShortJobs(childComplexity), true
case "JobsStatistics.totalAccHours":
if e.complexity.JobsStatistics.TotalAccHours == nil {
break
}
return e.complexity.JobsStatistics.TotalAccHours(childComplexity), true
case "JobsStatistics.totalCoreHours": case "JobsStatistics.totalCoreHours":
if e.complexity.JobsStatistics.TotalCoreHours == nil { if e.complexity.JobsStatistics.TotalCoreHours == nil {
break break
@ -752,6 +769,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.JobsStatistics.TotalJobs(childComplexity), true return e.complexity.JobsStatistics.TotalJobs(childComplexity), true
case "JobsStatistics.totalNodeHours":
if e.complexity.JobsStatistics.TotalNodeHours == nil {
break
}
return e.complexity.JobsStatistics.TotalNodeHours(childComplexity), true
case "JobsStatistics.totalWalltime": case "JobsStatistics.totalWalltime":
if e.complexity.JobsStatistics.TotalWalltime == nil { if e.complexity.JobsStatistics.TotalWalltime == nil {
break break
@ -1764,10 +1788,13 @@ type HistoPoint {
type JobsStatistics { type JobsStatistics {
id: ID! # If ` + "`" + `groupBy` + "`" + ` was used, ID of the user/project/cluster id: ID! # If ` + "`" + `groupBy` + "`" + ` was used, ID of the user/project/cluster
name: String! # if User-Statistics: Given Name of Account (ID) Owner name: String! # if User-Statistics: Given Name of Account (ID) Owner
totalJobs: Int! # Number of jobs that matched totalJobs: Int! # Number of jobs
shortJobs: Int! # Number of jobs with a duration of less than 2 minutes runningJobs: Int! # Number of running jobs
shortJobs: Int! # Number of jobs with a duration of less than duration
totalWalltime: Int! # Sum of the duration of all matched jobs in hours totalWalltime: Int! # Sum of the duration of all matched jobs in hours
totalNodeHours: Int! # Sum of the node hours of all matched jobs
totalCoreHours: Int! # Sum of the core hours of all matched jobs totalCoreHours: Int! # Sum of the core hours of all matched jobs
totalAccHours: Int! # Sum of the gpu hours of all matched jobs
histDuration: [HistoPoint!]! # value: hour, count: number of jobs with a rounded duration of value histDuration: [HistoPoint!]! # value: hour, count: number of jobs with a rounded duration of value
histNumNodes: [HistoPoint!]! # value: number of nodes, count: number of jobs with that number of nodes histNumNodes: [HistoPoint!]! # value: number of nodes, count: number of jobs with that number of nodes
} }
@ -4884,6 +4911,50 @@ func (ec *executionContext) fieldContext_JobsStatistics_totalJobs(ctx context.Co
return fc, nil return fc, nil
} }
func (ec *executionContext) _JobsStatistics_runningJobs(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_JobsStatistics_runningJobs(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.RunningJobs, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(int)
fc.Result = res
return ec.marshalNInt2int(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_JobsStatistics_runningJobs(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "JobsStatistics",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Int does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _JobsStatistics_shortJobs(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) { func (ec *executionContext) _JobsStatistics_shortJobs(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_JobsStatistics_shortJobs(ctx, field) fc, err := ec.fieldContext_JobsStatistics_shortJobs(ctx, field)
if err != nil { if err != nil {
@ -4972,6 +5043,50 @@ func (ec *executionContext) fieldContext_JobsStatistics_totalWalltime(ctx contex
return fc, nil return fc, nil
} }
func (ec *executionContext) _JobsStatistics_totalNodeHours(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_JobsStatistics_totalNodeHours(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.TotalNodeHours, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(int)
fc.Result = res
return ec.marshalNInt2int(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_JobsStatistics_totalNodeHours(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "JobsStatistics",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Int does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _JobsStatistics_totalCoreHours(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) { func (ec *executionContext) _JobsStatistics_totalCoreHours(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_JobsStatistics_totalCoreHours(ctx, field) fc, err := ec.fieldContext_JobsStatistics_totalCoreHours(ctx, field)
if err != nil { if err != nil {
@ -5016,6 +5131,50 @@ func (ec *executionContext) fieldContext_JobsStatistics_totalCoreHours(ctx conte
return fc, nil return fc, nil
} }
func (ec *executionContext) _JobsStatistics_totalAccHours(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_JobsStatistics_totalAccHours(ctx, field)
if err != nil {
return graphql.Null
}
ctx = graphql.WithFieldContext(ctx, fc)
defer func() {
if r := recover(); r != nil {
ec.Error(ctx, ec.Recover(ctx, r))
ret = graphql.Null
}
}()
resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
ctx = rctx // use context from middleware stack in children
return obj.TotalAccHours, nil
})
if err != nil {
ec.Error(ctx, err)
return graphql.Null
}
if resTmp == nil {
if !graphql.HasFieldError(ctx, fc) {
ec.Errorf(ctx, "must not be null")
}
return graphql.Null
}
res := resTmp.(int)
fc.Result = res
return ec.marshalNInt2int(ctx, field.Selections, res)
}
func (ec *executionContext) fieldContext_JobsStatistics_totalAccHours(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "JobsStatistics",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type Int does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _JobsStatistics_histDuration(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) { func (ec *executionContext) _JobsStatistics_histDuration(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_JobsStatistics_histDuration(ctx, field) fc, err := ec.fieldContext_JobsStatistics_histDuration(ctx, field)
if err != nil { if err != nil {
@ -6867,12 +7026,18 @@ func (ec *executionContext) fieldContext_Query_jobsStatistics(ctx context.Contex
return ec.fieldContext_JobsStatistics_name(ctx, field) return ec.fieldContext_JobsStatistics_name(ctx, field)
case "totalJobs": case "totalJobs":
return ec.fieldContext_JobsStatistics_totalJobs(ctx, field) return ec.fieldContext_JobsStatistics_totalJobs(ctx, field)
case "runningJobs":
return ec.fieldContext_JobsStatistics_runningJobs(ctx, field)
case "shortJobs": case "shortJobs":
return ec.fieldContext_JobsStatistics_shortJobs(ctx, field) return ec.fieldContext_JobsStatistics_shortJobs(ctx, field)
case "totalWalltime": case "totalWalltime":
return ec.fieldContext_JobsStatistics_totalWalltime(ctx, field) return ec.fieldContext_JobsStatistics_totalWalltime(ctx, field)
case "totalNodeHours":
return ec.fieldContext_JobsStatistics_totalNodeHours(ctx, field)
case "totalCoreHours": case "totalCoreHours":
return ec.fieldContext_JobsStatistics_totalCoreHours(ctx, field) return ec.fieldContext_JobsStatistics_totalCoreHours(ctx, field)
case "totalAccHours":
return ec.fieldContext_JobsStatistics_totalAccHours(ctx, field)
case "histDuration": case "histDuration":
return ec.fieldContext_JobsStatistics_histDuration(ctx, field) return ec.fieldContext_JobsStatistics_histDuration(ctx, field)
case "histNumNodes": case "histNumNodes":
@ -12062,6 +12227,13 @@ func (ec *executionContext) _JobsStatistics(ctx context.Context, sel ast.Selecti
out.Values[i] = ec._JobsStatistics_totalJobs(ctx, field, obj) out.Values[i] = ec._JobsStatistics_totalJobs(ctx, field, obj)
if out.Values[i] == graphql.Null {
invalids++
}
case "runningJobs":
out.Values[i] = ec._JobsStatistics_runningJobs(ctx, field, obj)
if out.Values[i] == graphql.Null { if out.Values[i] == graphql.Null {
invalids++ invalids++
} }
@ -12076,6 +12248,13 @@ func (ec *executionContext) _JobsStatistics(ctx context.Context, sel ast.Selecti
out.Values[i] = ec._JobsStatistics_totalWalltime(ctx, field, obj) out.Values[i] = ec._JobsStatistics_totalWalltime(ctx, field, obj)
if out.Values[i] == graphql.Null {
invalids++
}
case "totalNodeHours":
out.Values[i] = ec._JobsStatistics_totalNodeHours(ctx, field, obj)
if out.Values[i] == graphql.Null { if out.Values[i] == graphql.Null {
invalids++ invalids++
} }
@ -12083,6 +12262,13 @@ func (ec *executionContext) _JobsStatistics(ctx context.Context, sel ast.Selecti
out.Values[i] = ec._JobsStatistics_totalCoreHours(ctx, field, obj) out.Values[i] = ec._JobsStatistics_totalCoreHours(ctx, field, obj)
if out.Values[i] == graphql.Null {
invalids++
}
case "totalAccHours":
out.Values[i] = ec._JobsStatistics_totalAccHours(ctx, field, obj)
if out.Values[i] == graphql.Null { if out.Values[i] == graphql.Null {
invalids++ invalids++
} }

View File

@ -90,9 +90,12 @@ type JobsStatistics struct {
ID string `json:"id"` ID string `json:"id"`
Name string `json:"name"` Name string `json:"name"`
TotalJobs int `json:"totalJobs"` TotalJobs int `json:"totalJobs"`
RunningJobs int `json:"runningJobs"`
ShortJobs int `json:"shortJobs"` ShortJobs int `json:"shortJobs"`
TotalWalltime int `json:"totalWalltime"` TotalWalltime int `json:"totalWalltime"`
TotalNodeHours int `json:"totalNodeHours"`
TotalCoreHours int `json:"totalCoreHours"` TotalCoreHours int `json:"totalCoreHours"`
TotalAccHours int `json:"totalAccHours"`
HistDuration []*HistoPoint `json:"histDuration"` HistDuration []*HistoPoint `json:"histDuration"`
HistNumNodes []*HistoPoint `json:"histNumNodes"` HistNumNodes []*HistoPoint `json:"histNumNodes"`
} }

View File

@ -263,7 +263,51 @@ func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, pag
// JobsStatistics is the resolver for the jobsStatistics field. // JobsStatistics is the resolver for the jobsStatistics field.
func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
return r.Repo.JobsStatistics(ctx, filter, groupBy) var err error
var stats []*model.JobsStatistics
if requireField(ctx, "totalJobs") {
if groupBy == nil {
stats, err = r.Repo.JobsStats(ctx, filter)
} else {
stats, err = r.Repo.JobsStatsGrouped(ctx, filter, groupBy)
}
} else {
stats = make([]*model.JobsStatistics, 0, 1)
}
if groupBy != nil {
if requireField(ctx, "shortJobs") {
stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "short")
}
if requireField(ctx, "RunningJobs") {
stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "running")
}
} else {
if requireField(ctx, "shortJobs") {
stats, err = r.Repo.AddJobCount(ctx, filter, stats, "short")
}
if requireField(ctx, "RunningJobs") {
stats, err = r.Repo.AddJobCount(ctx, filter, stats, "running")
}
}
if err != nil {
return nil, err
}
if requireField(ctx, "histDuration") || requireField(ctx, "histNumNodes") {
if groupBy == nil {
stats[0], err = r.Repo.AddHistograms(ctx, filter, stats[0])
if err != nil {
return nil, err
}
} else {
return nil, errors.New("histograms only implemented without groupBy argument")
}
}
return stats, nil
} }
// JobsCount is the resolver for the jobsCount field. // JobsCount is the resolver for the jobsCount field.

View File

@ -10,6 +10,7 @@ import (
"fmt" "fmt"
"math" "math"
"github.com/99designs/gqlgen/graphql"
"github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
@ -132,3 +133,15 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF
Metrics: res, Metrics: res,
}, nil }, nil
} }
func requireField(ctx context.Context, name string) bool {
fields := graphql.CollectAllFields(ctx)
for _, f := range fields {
if f == name {
return true
}
}
return false
}

View File

@ -6,10 +6,10 @@ package repository
import ( import (
"database/sql" "database/sql"
"log"
"sync" "sync"
"time" "time"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/mattn/go-sqlite3" "github.com/mattn/go-sqlite3"
"github.com/qustavo/sqlhooks/v2" "github.com/qustavo/sqlhooks/v2"
@ -48,15 +48,17 @@ func Connect(driver string, db string) {
switch driver { switch driver {
case "sqlite3": case "sqlite3":
sql.Register("sqlite3WithHooks", sqlhooks.Wrap(&sqlite3.SQLiteDriver{}, &Hooks{}))
// - Set WAL mode (not strictly necessary each time because it's persisted in the database, but good for first run) // - Set WAL mode (not strictly necessary each time because it's persisted in the database, but good for first run)
// - Set busy timeout, so concurrent writers wait on each other instead of erroring immediately // - Set busy timeout, so concurrent writers wait on each other instead of erroring immediately
// - Enable foreign key checks // - Enable foreign key checks
opts.URL += "?_journal=WAL&_timeout=5000&_fk=true" opts.URL += "?_journal=WAL&_timeout=5000&_fk=true"
if log.Loglevel() == "debug" {
sql.Register("sqlite3WithHooks", sqlhooks.Wrap(&sqlite3.SQLiteDriver{}, &Hooks{}))
dbHandle, err = sqlx.Open("sqlite3WithHooks", opts.URL) dbHandle, err = sqlx.Open("sqlite3WithHooks", opts.URL)
// dbHandle, err = sqlx.Open("sqlite3", opts.URL) } else {
dbHandle, err = sqlx.Open("sqlite3", opts.URL)
}
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@ -14,12 +14,9 @@ import (
"sync" "sync"
"time" "time"
"github.com/99designs/gqlgen/graphql"
"github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache" "github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/pkg/schema"
@ -747,242 +744,6 @@ func (r *JobRepository) FindJobsBefore(startTime int64) ([]*schema.Job, error) {
return jobs, nil return jobs, nil
} }
// GraphQL validation should make sure that no unkown values can be specified.
var groupBy2column = map[model.Aggregate]string{
model.AggregateUser: "job.user",
model.AggregateProject: "job.project",
model.AggregateCluster: "job.cluster",
}
// Helper function for the jobsStatistics GraphQL query placed here so that schema.resolvers.go is not too full.
func (r *JobRepository) JobsStatistics(ctx context.Context,
filter []*model.JobFilter,
groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
start := time.Now()
// In case `groupBy` is nil (not used), the model.JobsStatistics used is at the key '' (empty string)
stats := map[string]*model.JobsStatistics{}
var castType string
switch r.driver {
case "sqlite3":
castType = "int"
case "mysql":
castType = "unsigned"
}
// `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those.
for _, cluster := range archive.Clusters {
for _, subcluster := range cluster.SubClusters {
corehoursCol := fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as %s)", subcluster.SocketsPerNode, subcluster.CoresPerSocket, castType)
var rawQuery sq.SelectBuilder
if groupBy == nil {
rawQuery = sq.Select(
"''",
"COUNT(job.id)",
fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType),
corehoursCol,
).From("job")
} else {
col := groupBy2column[*groupBy]
rawQuery = sq.Select(
col,
"COUNT(job.id)",
fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType),
corehoursCol,
).From("job").GroupBy(col)
}
rawQuery = rawQuery.
Where("job.cluster = ?", cluster.Name).
Where("job.subcluster = ?", subcluster.Name)
query, qerr := SecurityCheck(ctx, rawQuery)
if qerr != nil {
return nil, qerr
}
for _, f := range filter {
query = BuildWhereClause(f, query)
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
log.Warn("Error while querying DB for job statistics")
return nil, err
}
for rows.Next() {
var id sql.NullString
var jobs, walltime, corehours sql.NullInt64
if err := rows.Scan(&id, &jobs, &walltime, &corehours); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
if s, ok := stats[id.String]; ok {
s.TotalJobs += int(jobs.Int64)
s.TotalWalltime += int(walltime.Int64)
s.TotalCoreHours += int(corehours.Int64)
} else {
stats[id.String] = &model.JobsStatistics{
ID: id.String,
TotalJobs: int(jobs.Int64),
TotalWalltime: int(walltime.Int64),
TotalCoreHours: int(corehours.Int64),
}
}
}
}
}
}
if groupBy == nil {
query := sq.Select("COUNT(job.id)").From("job").Where("job.duration < ?", config.Keys.ShortRunningJobsDuration)
query, qerr := SecurityCheck(ctx, query)
if qerr != nil {
return nil, qerr
}
for _, f := range filter {
query = BuildWhereClause(f, query)
}
if err := query.RunWith(r.DB).QueryRow().Scan(&(stats[""].ShortJobs)); err != nil {
log.Warn("Error while scanning rows for short job stats")
return nil, err
}
} else {
col := groupBy2column[*groupBy]
query := sq.Select(col, "COUNT(job.id)").From("job").Where("job.duration < ?", config.Keys.ShortRunningJobsDuration)
query, qerr := SecurityCheck(ctx, query)
if qerr != nil {
return nil, qerr
}
for _, f := range filter {
query = BuildWhereClause(f, query)
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
log.Warn("Error while querying jobs for short jobs")
return nil, err
}
for rows.Next() {
var id sql.NullString
var shortJobs sql.NullInt64
if err := rows.Scan(&id, &shortJobs); err != nil {
log.Warn("Error while scanning rows for short jobs")
return nil, err
}
if id.Valid {
stats[id.String].ShortJobs = int(shortJobs.Int64)
}
}
if col == "job.user" {
for id := range stats {
emptyDash := "-"
user := auth.GetUser(ctx)
name, _ := r.FindColumnValue(user, id, "user", "name", "username", false)
if name != "" {
stats[id].Name = name
} else {
stats[id].Name = emptyDash
}
}
}
}
// Calculating the histogram data is expensive, so only do it if needed.
// An explicit resolver can not be used because we need to know the filters.
histogramsNeeded := false
fields := graphql.CollectFieldsCtx(ctx, nil)
for _, col := range fields {
if col.Name == "histDuration" || col.Name == "histNumNodes" {
histogramsNeeded = true
}
}
res := make([]*model.JobsStatistics, 0, len(stats))
for _, stat := range stats {
res = append(res, stat)
id, col := "", ""
if groupBy != nil {
id = stat.ID
col = groupBy2column[*groupBy]
}
if histogramsNeeded {
var err error
value := fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType)
stat.HistDuration, err = r.jobsStatisticsHistogram(ctx, value, filter, id, col)
if err != nil {
log.Warn("Error while loading job statistics histogram: running jobs")
return nil, err
}
stat.HistNumNodes, err = r.jobsStatisticsHistogram(ctx, "job.num_nodes as value", filter, id, col)
if err != nil {
log.Warn("Error while loading job statistics histogram: num nodes")
return nil, err
}
}
}
log.Infof("Timer JobStatistics %s", time.Since(start))
return res, nil
}
// `value` must be the column grouped by, but renamed to "value". `id` and `col` can optionally be used
// to add a condition to the query of the kind "<col> = <id>".
func (r *JobRepository) jobsStatisticsHistogram(ctx context.Context,
value string, filters []*model.JobFilter, id, col string) ([]*model.HistoPoint, error) {
start := time.Now()
query, qerr := SecurityCheck(ctx, sq.Select(value, "COUNT(job.id) AS count").From("job"))
if qerr != nil {
return nil, qerr
}
for _, f := range filters {
query = BuildWhereClause(f, query)
}
if len(id) != 0 && len(col) != 0 {
query = query.Where(col+" = ?", id)
}
rows, err := query.GroupBy("value").RunWith(r.DB).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
points := make([]*model.HistoPoint, 0)
for rows.Next() {
point := model.HistoPoint{}
if err := rows.Scan(&point.Value, &point.Count); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
points = append(points, &point)
}
log.Infof("Timer jobsStatisticsHistogram %s", time.Since(start))
return points, nil
}
const NamedJobInsert string = `INSERT INTO job ( const NamedJobInsert string = `INSERT INTO job (
job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc, job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data, exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data,
@ -1007,96 +768,3 @@ func (r *JobRepository) InsertJob(job *schema.Job) (int64, error) {
return id, nil return id, nil
} }
type Transaction struct {
tx *sqlx.Tx
stmt *sqlx.NamedStmt
}
func (r *JobRepository) TransactionInit() (*Transaction, error) {
var err error
t := new(Transaction)
// Inserts are bundled into transactions because in sqlite,
// that speeds up inserts A LOT.
t.tx, err = r.DB.Beginx()
if err != nil {
log.Warn("Error while bundling transactions")
return nil, err
}
t.stmt, err = t.tx.PrepareNamed(NamedJobInsert)
if err != nil {
log.Warn("Error while preparing namedJobInsert")
return nil, err
}
return t, nil
}
func (r *JobRepository) TransactionCommit(t *Transaction) error {
var err error
if t.tx != nil {
if err = t.tx.Commit(); err != nil {
log.Warn("Error while committing transactions")
return err
}
}
t.tx, err = r.DB.Beginx()
if err != nil {
log.Warn("Error while bundling transactions")
return err
}
t.stmt = t.tx.NamedStmt(t.stmt)
return nil
}
func (r *JobRepository) TransactionEnd(t *Transaction) error {
if err := t.tx.Commit(); err != nil {
log.Warn("Error while committing SQL transactions")
return err
}
return nil
}
func (r *JobRepository) TransactionAdd(t *Transaction, job schema.Job) (int64, error) {
res, err := t.stmt.Exec(job)
if err != nil {
log.Errorf("repository initDB(): %v", err)
return 0, err
}
id, err := res.LastInsertId()
if err != nil {
log.Errorf("repository initDB(): %v", err)
return 0, err
}
return id, nil
}
func (r *JobRepository) TransactionAddTag(t *Transaction, tag *schema.Tag) (int64, error) {
res, err := t.tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type)
if err != nil {
log.Errorf("Error while inserting tag into tag table: %v (Type %v)", tag.Name, tag.Type)
return 0, err
}
tagId, err := res.LastInsertId()
if err != nil {
log.Warn("Error while getting last insert ID")
return 0, err
}
return tagId, nil
}
func (r *JobRepository) TransactionSetTag(t *Transaction, jobId int64, tagId int64) error {
if _, err := t.tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, jobId, tagId); err != nil {
log.Errorf("Error while inserting jobtag into jobtag table: %v (TagID %v)", jobId, tagId)
return err
}
return nil
}

View File

@ -0,0 +1,411 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/log"
sq "github.com/Masterminds/squirrel"
)
// GraphQL validation should make sure that no unkown values can be specified.
var groupBy2column = map[model.Aggregate]string{
model.AggregateUser: "job.user",
model.AggregateProject: "job.project",
model.AggregateCluster: "job.cluster",
}
func (r *JobRepository) buildCountQuery(
filter []*model.JobFilter,
kind string,
col string) sq.SelectBuilder {
var query sq.SelectBuilder
if col != "" {
// Scan columns: id, cnt
query = sq.Select(col, "COUNT(job.id)").From("job").GroupBy(col)
} else {
// Scan columns: cnt
query = sq.Select("COUNT(job.id)").From("job")
}
switch kind {
case "running":
query = query.Where("job.job_state = ?", "running")
case "short":
query = query.Where("job.duration < ?", config.Keys.ShortRunningJobsDuration)
}
for _, f := range filter {
query = BuildWhereClause(f, query)
}
return query
}
func (r *JobRepository) buildStatsQuery(
filter []*model.JobFilter,
col string) sq.SelectBuilder {
var query sq.SelectBuilder
castType := r.getCastType()
if col != "" {
// Scan columns: id, totalJobs, totalWalltime, totalNodeHours, totalCoreHours, totalAccHours
query = sq.Select(col, "COUNT(job.id)",
fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_acc) / 3600) as %s)", castType),
).From("job").GroupBy(col)
} else {
// Scan columns: totalJobs, totalWalltime, totalNodeHours, totalCoreHours, totalAccHours
query = sq.Select("COUNT(job.id)",
fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_acc) / 3600) as %s)", castType),
).From("job")
}
for _, f := range filter {
query = BuildWhereClause(f, query)
}
return query
}
func (r *JobRepository) getUserName(ctx context.Context, id string) string {
user := auth.GetUser(ctx)
name, _ := r.FindColumnValue(user, id, "user", "name", "username", false)
if name != "" {
return name
} else {
return "-"
}
}
func (r *JobRepository) getCastType() string {
var castType string
switch r.driver {
case "sqlite3":
castType = "int"
case "mysql":
castType = "unsigned"
default:
castType = ""
}
return castType
}
func (r *JobRepository) JobsStatsGrouped(
ctx context.Context,
filter []*model.JobFilter,
groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
start := time.Now()
col := groupBy2column[*groupBy]
query := r.buildStatsQuery(filter, col)
query, err := SecurityCheck(ctx, query)
if err != nil {
return nil, err
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
log.Warn("Error while querying DB for job statistics")
return nil, err
}
stats := make([]*model.JobsStatistics, 0, 100)
for rows.Next() {
var id sql.NullString
var jobs, walltime, nodeHours, coreHours, accHours sql.NullInt64
if err := rows.Scan(&id, &jobs, &walltime, &nodeHours, &coreHours, &accHours); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
var totalCoreHours, totalAccHours int
if coreHours.Valid {
totalCoreHours = int(coreHours.Int64)
}
if accHours.Valid {
totalAccHours = int(accHours.Int64)
}
if col == "job.user" {
name := r.getUserName(ctx, id.String)
stats = append(stats,
&model.JobsStatistics{
ID: id.String,
Name: name,
TotalJobs: int(jobs.Int64),
TotalWalltime: int(walltime.Int64),
TotalCoreHours: totalCoreHours,
TotalAccHours: totalAccHours})
} else {
stats = append(stats,
&model.JobsStatistics{
ID: id.String,
TotalJobs: int(jobs.Int64),
TotalWalltime: int(walltime.Int64),
TotalCoreHours: totalCoreHours,
TotalAccHours: totalAccHours})
}
}
}
log.Infof("Timer JobsStatsGrouped %s", time.Since(start))
return stats, nil
}
func (r *JobRepository) JobsStats(
ctx context.Context,
filter []*model.JobFilter) ([]*model.JobsStatistics, error) {
start := time.Now()
query := r.buildStatsQuery(filter, "")
query, err := SecurityCheck(ctx, query)
if err != nil {
return nil, err
}
row := query.RunWith(r.DB).QueryRow()
stats := make([]*model.JobsStatistics, 0, 1)
var jobs, walltime, nodeHours, coreHours, accHours sql.NullInt64
if err := row.Scan(&jobs, &walltime, &nodeHours, &coreHours, &accHours); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if jobs.Valid {
var totalCoreHours, totalAccHours int
if coreHours.Valid {
totalCoreHours = int(coreHours.Int64)
}
if accHours.Valid {
totalAccHours = int(accHours.Int64)
}
stats = append(stats,
&model.JobsStatistics{
TotalJobs: int(jobs.Int64),
TotalWalltime: int(walltime.Int64),
TotalCoreHours: totalCoreHours,
TotalAccHours: totalAccHours})
}
log.Infof("Timer JobStats %s", time.Since(start))
return stats, nil
}
func (r *JobRepository) JobCountGrouped(
ctx context.Context,
filter []*model.JobFilter,
groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
start := time.Now()
col := groupBy2column[*groupBy]
query := r.buildCountQuery(filter, "", col)
query, err := SecurityCheck(ctx, query)
if err != nil {
return nil, err
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
log.Warn("Error while querying DB for job statistics")
return nil, err
}
stats := make([]*model.JobsStatistics, 0, 100)
for rows.Next() {
var id sql.NullString
var cnt sql.NullInt64
if err := rows.Scan(&id, &cnt); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
stats = append(stats,
&model.JobsStatistics{
ID: id.String,
TotalJobs: int(cnt.Int64)})
}
}
log.Infof("Timer JobCountGrouped %s", time.Since(start))
return stats, nil
}
func (r *JobRepository) AddJobCountGrouped(
ctx context.Context,
filter []*model.JobFilter,
groupBy *model.Aggregate,
stats []*model.JobsStatistics,
kind string) ([]*model.JobsStatistics, error) {
start := time.Now()
col := groupBy2column[*groupBy]
query := r.buildCountQuery(filter, kind, col)
query, err := SecurityCheck(ctx, query)
if err != nil {
return nil, err
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
log.Warn("Error while querying DB for job statistics")
return nil, err
}
counts := make(map[string]int)
for rows.Next() {
var id sql.NullString
var cnt sql.NullInt64
if err := rows.Scan(&id, &cnt); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
counts[id.String] = int(cnt.Int64)
}
}
switch kind {
case "running":
for _, s := range stats {
s.RunningJobs = counts[s.ID]
}
case "short":
for _, s := range stats {
s.ShortJobs = counts[s.ID]
}
}
log.Infof("Timer AddJobCountGrouped %s", time.Since(start))
return stats, nil
}
func (r *JobRepository) AddJobCount(
ctx context.Context,
filter []*model.JobFilter,
stats []*model.JobsStatistics,
kind string) ([]*model.JobsStatistics, error) {
start := time.Now()
query := r.buildCountQuery(filter, kind, "")
query, err := SecurityCheck(ctx, query)
if err != nil {
return nil, err
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
log.Warn("Error while querying DB for job statistics")
return nil, err
}
counts := make(map[string]int)
for rows.Next() {
var cnt sql.NullInt64
if err := rows.Scan(&cnt); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
}
switch kind {
case "running":
for _, s := range stats {
s.RunningJobs = counts[s.ID]
}
case "short":
for _, s := range stats {
s.ShortJobs = counts[s.ID]
}
}
log.Infof("Timer JobJobCount %s", time.Since(start))
return stats, nil
}
func (r *JobRepository) AddHistograms(
ctx context.Context,
filter []*model.JobFilter,
stat *model.JobsStatistics) (*model.JobsStatistics, error) {
start := time.Now()
castType := r.getCastType()
var err error
value := fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType)
stat.HistDuration, err = r.jobsStatisticsHistogram(ctx, value, filter)
if err != nil {
log.Warn("Error while loading job statistics histogram: running jobs")
return nil, err
}
stat.HistNumNodes, err = r.jobsStatisticsHistogram(ctx, "job.num_nodes as value", filter)
if err != nil {
log.Warn("Error while loading job statistics histogram: num nodes")
return nil, err
}
log.Infof("Timer AddHistograms %s", time.Since(start))
return stat, nil
}
// `value` must be the column grouped by, but renamed to "value"
func (r *JobRepository) jobsStatisticsHistogram(
ctx context.Context,
value string,
filters []*model.JobFilter) ([]*model.HistoPoint, error) {
start := time.Now()
query, qerr := SecurityCheck(ctx,
sq.Select(value, "COUNT(job.id) AS count").From("job"))
if qerr != nil {
return nil, qerr
}
for _, f := range filters {
query = BuildWhereClause(f, query)
}
rows, err := query.GroupBy("value").RunWith(r.DB).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
points := make([]*model.HistoPoint, 0)
for rows.Next() {
point := model.HistoPoint{}
if err := rows.Scan(&point.Value, &point.Count); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
points = append(points, &point)
}
log.Infof("Timer jobsStatisticsHistogram %s", time.Since(start))
return points, nil
}

View File

@ -0,0 +1,21 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"fmt"
"testing"
)
func TestBuildJobStatsQuery(t *testing.T) {
r := setup(t)
q := r.buildStatsQuery(nil, "USER")
sql, _, err := q.ToSql()
noErr(t, err)
fmt.Printf("SQL: %s\n", sql)
}

View File

@ -0,0 +1,104 @@
// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
// All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package repository
import (
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/jmoiron/sqlx"
)
type Transaction struct {
tx *sqlx.Tx
stmt *sqlx.NamedStmt
}
func (r *JobRepository) TransactionInit() (*Transaction, error) {
var err error
t := new(Transaction)
// Inserts are bundled into transactions because in sqlite,
// that speeds up inserts A LOT.
t.tx, err = r.DB.Beginx()
if err != nil {
log.Warn("Error while bundling transactions")
return nil, err
}
t.stmt, err = t.tx.PrepareNamed(NamedJobInsert)
if err != nil {
log.Warn("Error while preparing namedJobInsert")
return nil, err
}
return t, nil
}
func (r *JobRepository) TransactionCommit(t *Transaction) error {
var err error
if t.tx != nil {
if err = t.tx.Commit(); err != nil {
log.Warn("Error while committing transactions")
return err
}
}
t.tx, err = r.DB.Beginx()
if err != nil {
log.Warn("Error while bundling transactions")
return err
}
t.stmt = t.tx.NamedStmt(t.stmt)
return nil
}
func (r *JobRepository) TransactionEnd(t *Transaction) error {
if err := t.tx.Commit(); err != nil {
log.Warn("Error while committing SQL transactions")
return err
}
return nil
}
func (r *JobRepository) TransactionAdd(t *Transaction, job schema.Job) (int64, error) {
res, err := t.stmt.Exec(job)
if err != nil {
log.Errorf("repository initDB(): %v", err)
return 0, err
}
id, err := res.LastInsertId()
if err != nil {
log.Errorf("repository initDB(): %v", err)
return 0, err
}
return id, nil
}
func (r *JobRepository) TransactionAddTag(t *Transaction, tag *schema.Tag) (int64, error) {
res, err := t.tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type)
if err != nil {
log.Errorf("Error while inserting tag into tag table: %v (Type %v)", tag.Name, tag.Type)
return 0, err
}
tagId, err := res.LastInsertId()
if err != nil {
log.Warn("Error while getting last insert ID")
return 0, err
}
return tagId, nil
}
func (r *JobRepository) TransactionSetTag(t *Transaction, jobId int64, tagId int64) error {
if _, err := t.tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, jobId, tagId); err != nil {
log.Errorf("Error while inserting jobtag into jobtag table: %v (TagID %v)", jobId, tagId)
return err
}
return nil
}

View File

@ -14,12 +14,9 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/api" "github.com/ClusterCockpit/cc-backend/internal/api"
"github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/auth"
"github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/ClusterCockpit/cc-backend/web" "github.com/ClusterCockpit/cc-backend/web"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
@ -50,47 +47,20 @@ var routes []Route = []Route{
} }
func setupHomeRoute(i InfoType, r *http.Request) InfoType { func setupHomeRoute(i InfoType, r *http.Request) InfoType {
type cluster struct {
Name string
RunningJobs int
TotalJobs int
RecentShortJobs int
}
jobRepo := repository.GetJobRepository() jobRepo := repository.GetJobRepository()
groupBy := model.AggregateCluster
runningJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{ stats, err := jobRepo.JobCountGrouped(r.Context(), nil, &groupBy)
State: []schema.JobState{schema.JobStateRunning},
}}, nil, nil)
if err != nil { if err != nil {
log.Warnf("failed to count jobs: %s", err.Error()) log.Warnf("failed to count jobs: %s", err.Error())
runningJobs = map[string]int{}
}
totalJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, nil, nil, nil)
if err != nil {
log.Warnf("failed to count jobs: %s", err.Error())
totalJobs = map[string]int{}
}
from := time.Now().Add(-24 * time.Hour)
recentShortJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{
StartTime: &schema.TimeRange{From: &from, To: nil},
Duration: &schema.IntRange{From: 0, To: config.Keys.ShortRunningJobsDuration},
}}, nil, nil)
if err != nil {
log.Warnf("failed to count jobs: %s", err.Error())
recentShortJobs = map[string]int{}
} }
clusters := make([]cluster, 0) stats, err = jobRepo.AddJobCountGrouped(r.Context(), nil, &groupBy, stats, "running")
for _, c := range archive.Clusters { if err != nil {
clusters = append(clusters, cluster{ log.Warnf("failed to count running jobs: %s", err.Error())
Name: c.Name,
RunningJobs: runningJobs[c.Name],
TotalJobs: totalJobs[c.Name],
RecentShortJobs: recentShortJobs[c.Name],
})
} }
i["clusters"] = clusters i["clusters"] = stats
return i return i
} }

View File

@ -41,6 +41,8 @@ var (
CritLog *log.Logger = log.New(CritWriter, CritPrefix, log.LstdFlags|log.Llongfile) CritLog *log.Logger = log.New(CritWriter, CritPrefix, log.LstdFlags|log.Llongfile)
) )
var loglevel string = "info"
/* CONFIG */ /* CONFIG */
func Init(lvl string, logdate bool) { func Init(lvl string, logdate bool) {
@ -78,6 +80,8 @@ func Init(lvl string, logdate bool) {
ErrLog = log.New(ErrWriter, ErrPrefix, log.LstdFlags|log.Llongfile) ErrLog = log.New(ErrWriter, ErrPrefix, log.LstdFlags|log.Llongfile)
CritLog = log.New(CritWriter, CritPrefix, log.LstdFlags|log.Llongfile) CritLog = log.New(CritWriter, CritPrefix, log.LstdFlags|log.Llongfile)
} }
loglevel = lvl
} }
/* PRINT */ /* PRINT */
@ -170,6 +174,10 @@ func Fatalf(format string, v ...interface{}) {
os.Exit(1) os.Exit(1)
} }
func Loglevel() string {
return loglevel
}
/* SPECIAL */ /* SPECIAL */
// func Finfof(w io.Writer, format string, v ...interface{}) { // func Finfof(w io.Writer, format string, v ...interface{}) {

View File

@ -25,6 +25,7 @@ type BaseJob struct {
Partition string `json:"partition,omitempty" db:"partition" example:"main"` // The Slurm partition to which the job was submitted Partition string `json:"partition,omitempty" db:"partition" example:"main"` // The Slurm partition to which the job was submitted
ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"` // The unique identifier of an array job ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"` // The unique identifier of an array job
NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0) NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0)
// NumCores int32 `json:"numCores" db:"num_cores" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0)
NumHWThreads int32 `json:"numHwthreads,omitempty" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0) NumHWThreads int32 `json:"numHwthreads,omitempty" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0)
NumAcc int32 `json:"numAcc,omitempty" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0) NumAcc int32 `json:"numAcc,omitempty" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0)
Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user

View File

@ -65,7 +65,7 @@
histNumNodes { count, value } histNumNodes { count, value }
} }
topUsers: jobsCount(filter: $filters, groupBy: USER, weight: NODE_HOURS, limit: 5) { name, count } topUsers: jobsCount(filter: $jobFilters, groupBy: USER, weight: NODE_HOURS, limit: 5) { name, count }
} }
`, `,
variables: { jobFilters } variables: { jobFilters }
@ -143,7 +143,7 @@
<td>{$statsQuery.data.stats[0].totalJobs}</td> <td>{$statsQuery.data.stats[0].totalJobs}</td>
</tr> </tr>
<tr> <tr>
<th scope="col">Short Jobs (&#60; 2m)</th> <th scope="col">Short Jobs</th>
<td>{$statsQuery.data.stats[0].shortJobs}</td> <td>{$statsQuery.data.stats[0].shortJobs}</td>
</tr> </tr>
<tr> <tr>
@ -160,7 +160,7 @@
{#key $statsQuery.data.topUsers} {#key $statsQuery.data.topUsers}
<h4>Top Users (by node hours)</h4> <h4>Top Users (by node hours)</h4>
<Histogram <Histogram
width={colWidth - 25} height={300 * 0.5} width={colWidth - 25} height={300 * 0.5} small={true}
data={$statsQuery.data.topUsers.sort((a, b) => b.count - a.count).map(({ count }, idx) => ({ count, value: idx }))} data={$statsQuery.data.topUsers.sort((a, b) => b.count - a.count).map(({ count }, idx) => ({ count, value: idx }))}
label={(x) => x < $statsQuery.data.topUsers.length ? $statsQuery.data.topUsers[Math.floor(x)].name : 'No Users'} label={(x) => x < $statsQuery.data.topUsers.length ? $statsQuery.data.topUsers[Math.floor(x)].name : 'No Users'}
ylabel="Node Hours [h]"/> ylabel="Node Hours [h]"/>

View File

@ -45,6 +45,7 @@
totalJobs totalJobs
totalWalltime totalWalltime
totalCoreHours totalCoreHours
totalAccHours
} }
}`, }`,
variables: { jobFilters } variables: { jobFilters }
@ -166,6 +167,18 @@
<Icon name="sort-numeric-down" /> <Icon name="sort-numeric-down" />
</Button> </Button>
</th> </th>
<th scope="col">
Total Accelerator Hours
<Button
color={sorting.field == "totalAccHours"
? "primary"
: "light"}
size="sm"
on:click={(e) => changeSorting(e, "totalAccHours")}
>
<Icon name="sort-numeric-down" />
</Button>
</th>
</tr> </tr>
</thead> </thead>
<tbody> <tbody>
@ -205,6 +218,7 @@
<td>{row.totalJobs}</td> <td>{row.totalJobs}</td>
<td>{row.totalWalltime}</td> <td>{row.totalWalltime}</td>
<td>{row.totalCoreHours}</td> <td>{row.totalCoreHours}</td>
<td>{row.totalAccHours}</td>
</tr> </tr>
{:else} {:else}
<tr> <tr>

View File

@ -24,6 +24,7 @@
export let ylabel = '' export let ylabel = ''
export let min = null export let min = null
export let max = null export let max = null
export let small = false
export let label = formatNumber export let label = formatNumber
const fontSize = 12 const fontSize = 12
@ -108,7 +109,7 @@
} else { } else {
const stepsizeX = getStepSize(maxValue, w, 120) const stepsizeX = getStepSize(maxValue, w, 120)
for (let x = 0; x <= maxValue; x += stepsizeX) { for (let x = 0; x <= maxValue; x += stepsizeX) {
ctx.fillText(label(x), getCanvasX(x), height - paddingBottom - Math.floor(labelOffset / 2)) ctx.fillText(label(x), getCanvasX(x), height - paddingBottom - Math.floor(labelOffset / (small ? 8 : 2)))
} }
} }

View File

@ -6,9 +6,8 @@
<thead> <thead>
<tr> <tr>
<th>Name</th> <th>Name</th>
<th>Running Jobs (short ones not listed)</th> <th>Running Jobs</th>
<th>Total Jobs</th> <th>Total Jobs</th>
<th>Short Jobs in past 24h</th>
{{if .User.HasRole .Roles.admin}} {{if .User.HasRole .Roles.admin}}
<th>System View</th> <th>System View</th>
<th>Analysis View</th> <th>Analysis View</th>
@ -19,21 +18,19 @@
{{if .User.HasRole .Roles.admin}} {{if .User.HasRole .Roles.admin}}
{{range .Infos.clusters}} {{range .Infos.clusters}}
<tr> <tr>
<td>{{.Name}}</td> <td>{{.ID}}</td>
<td><a href="/monitoring/jobs/?cluster={{.Name}}&state=running">{{.RunningJobs}} jobs</a></td> <td><a href="/monitoring/jobs/?cluster={{.ID}}&state=running">{{.RunningJobs}} jobs</a></td>
<td><a href="/monitoring/jobs/?cluster={{.Name}}">{{.TotalJobs}} jobs</a></td> <td><a href="/monitoring/jobs/?cluster={{.ID}}">{{.TotalJobs}} jobs</a></td>
<td>{{.RecentShortJobs}}</td> <td><a href="/monitoring/systems/{{.ID}}">System View</a></td>
<td><a href="/monitoring/systems/{{.Name}}">System View</a></td> <td><a href="/monitoring/analysis/{{.ID}}">Analysis View</a></td>
<td><a href="/monitoring/analysis/{{.Name}}">Analysis View</a></td>
</tr> </tr>
{{end}} {{end}}
{{else}} {{else}}
{{range .Infos.clusters}} {{range .Infos.clusters}}
<tr> <tr>
<td>{{.Name}}</td> <td>{{.ID}}</td>
<td><a href="/monitoring/jobs/?cluster={{.Name}}&state=running">{{.RunningJobs}} jobs</a></td> <td><a href="/monitoring/jobs/?cluster={{.ID}}&state=running">{{.RunningJobs}} jobs</a></td>
<td><a href="/monitoring/jobs/?cluster={{.Name}}">{{.TotalJobs}} jobs</a></td> <td><a href="/monitoring/jobs/?cluster={{.ID}}">{{.TotalJobs}} jobs</a></td>
<td>{{.RecentShortJobs}}</td>
</tr> </tr>
{{end}} {{end}}
{{end}} {{end}}