diff --git a/api/schema.graphqls b/api/schema.graphqls
index b7c16ce..71a5373 100644
--- a/api/schema.graphqls
+++ b/api/schema.graphqls
@@ -286,10 +286,13 @@ type HistoPoint {
type JobsStatistics {
id: ID! # If `groupBy` was used, ID of the user/project/cluster
name: String! # if User-Statistics: Given Name of Account (ID) Owner
- totalJobs: Int! # Number of jobs that matched
- shortJobs: Int! # Number of jobs with a duration of less than 2 minutes
+ totalJobs: Int! # Number of jobs
+ runningJobs: Int! # Number of running jobs
+ shortJobs: Int! # Number of jobs with a duration of less than duration
totalWalltime: Int! # Sum of the duration of all matched jobs in hours
+ totalNodeHours: Int! # Sum of the node hours of all matched jobs
totalCoreHours: Int! # Sum of the core hours of all matched jobs
+ totalAccHours: Int! # Sum of the gpu hours of all matched jobs
histDuration: [HistoPoint!]! # value: hour, count: number of jobs with a rounded duration of value
histNumNodes: [HistoPoint!]! # value: number of nodes, count: number of jobs with that number of nodes
}
diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go
index 907526e..9d5f7d9 100644
--- a/internal/graph/generated/generated.go
+++ b/internal/graph/generated/generated.go
@@ -143,9 +143,12 @@ type ComplexityRoot struct {
HistNumNodes func(childComplexity int) int
ID func(childComplexity int) int
Name func(childComplexity int) int
+ RunningJobs func(childComplexity int) int
ShortJobs func(childComplexity int) int
+ TotalAccHours func(childComplexity int) int
TotalCoreHours func(childComplexity int) int
TotalJobs func(childComplexity int) int
+ TotalNodeHours func(childComplexity int) int
TotalWalltime func(childComplexity int) int
}
@@ -731,6 +734,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.JobsStatistics.Name(childComplexity), true
+ case "JobsStatistics.runningJobs":
+ if e.complexity.JobsStatistics.RunningJobs == nil {
+ break
+ }
+
+ return e.complexity.JobsStatistics.RunningJobs(childComplexity), true
+
case "JobsStatistics.shortJobs":
if e.complexity.JobsStatistics.ShortJobs == nil {
break
@@ -738,6 +748,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.JobsStatistics.ShortJobs(childComplexity), true
+ case "JobsStatistics.totalAccHours":
+ if e.complexity.JobsStatistics.TotalAccHours == nil {
+ break
+ }
+
+ return e.complexity.JobsStatistics.TotalAccHours(childComplexity), true
+
case "JobsStatistics.totalCoreHours":
if e.complexity.JobsStatistics.TotalCoreHours == nil {
break
@@ -752,6 +769,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in
return e.complexity.JobsStatistics.TotalJobs(childComplexity), true
+ case "JobsStatistics.totalNodeHours":
+ if e.complexity.JobsStatistics.TotalNodeHours == nil {
+ break
+ }
+
+ return e.complexity.JobsStatistics.TotalNodeHours(childComplexity), true
+
case "JobsStatistics.totalWalltime":
if e.complexity.JobsStatistics.TotalWalltime == nil {
break
@@ -1764,10 +1788,13 @@ type HistoPoint {
type JobsStatistics {
id: ID! # If ` + "`" + `groupBy` + "`" + ` was used, ID of the user/project/cluster
name: String! # if User-Statistics: Given Name of Account (ID) Owner
- totalJobs: Int! # Number of jobs that matched
- shortJobs: Int! # Number of jobs with a duration of less than 2 minutes
+ totalJobs: Int! # Number of jobs
+ runningJobs: Int! # Number of running jobs
+ shortJobs: Int! # Number of jobs with a duration of less than duration
totalWalltime: Int! # Sum of the duration of all matched jobs in hours
+ totalNodeHours: Int! # Sum of the node hours of all matched jobs
totalCoreHours: Int! # Sum of the core hours of all matched jobs
+ totalAccHours: Int! # Sum of the gpu hours of all matched jobs
histDuration: [HistoPoint!]! # value: hour, count: number of jobs with a rounded duration of value
histNumNodes: [HistoPoint!]! # value: number of nodes, count: number of jobs with that number of nodes
}
@@ -4884,6 +4911,50 @@ func (ec *executionContext) fieldContext_JobsStatistics_totalJobs(ctx context.Co
return fc, nil
}
+func (ec *executionContext) _JobsStatistics_runningJobs(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) {
+ fc, err := ec.fieldContext_JobsStatistics_runningJobs(ctx, field)
+ if err != nil {
+ return graphql.Null
+ }
+ ctx = graphql.WithFieldContext(ctx, fc)
+ defer func() {
+ if r := recover(); r != nil {
+ ec.Error(ctx, ec.Recover(ctx, r))
+ ret = graphql.Null
+ }
+ }()
+ resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
+ ctx = rctx // use context from middleware stack in children
+ return obj.RunningJobs, nil
+ })
+ if err != nil {
+ ec.Error(ctx, err)
+ return graphql.Null
+ }
+ if resTmp == nil {
+ if !graphql.HasFieldError(ctx, fc) {
+ ec.Errorf(ctx, "must not be null")
+ }
+ return graphql.Null
+ }
+ res := resTmp.(int)
+ fc.Result = res
+ return ec.marshalNInt2int(ctx, field.Selections, res)
+}
+
+func (ec *executionContext) fieldContext_JobsStatistics_runningJobs(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
+ fc = &graphql.FieldContext{
+ Object: "JobsStatistics",
+ Field: field,
+ IsMethod: false,
+ IsResolver: false,
+ Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
+ return nil, errors.New("field of type Int does not have child fields")
+ },
+ }
+ return fc, nil
+}
+
func (ec *executionContext) _JobsStatistics_shortJobs(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_JobsStatistics_shortJobs(ctx, field)
if err != nil {
@@ -4972,6 +5043,50 @@ func (ec *executionContext) fieldContext_JobsStatistics_totalWalltime(ctx contex
return fc, nil
}
+func (ec *executionContext) _JobsStatistics_totalNodeHours(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) {
+ fc, err := ec.fieldContext_JobsStatistics_totalNodeHours(ctx, field)
+ if err != nil {
+ return graphql.Null
+ }
+ ctx = graphql.WithFieldContext(ctx, fc)
+ defer func() {
+ if r := recover(); r != nil {
+ ec.Error(ctx, ec.Recover(ctx, r))
+ ret = graphql.Null
+ }
+ }()
+ resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
+ ctx = rctx // use context from middleware stack in children
+ return obj.TotalNodeHours, nil
+ })
+ if err != nil {
+ ec.Error(ctx, err)
+ return graphql.Null
+ }
+ if resTmp == nil {
+ if !graphql.HasFieldError(ctx, fc) {
+ ec.Errorf(ctx, "must not be null")
+ }
+ return graphql.Null
+ }
+ res := resTmp.(int)
+ fc.Result = res
+ return ec.marshalNInt2int(ctx, field.Selections, res)
+}
+
+func (ec *executionContext) fieldContext_JobsStatistics_totalNodeHours(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
+ fc = &graphql.FieldContext{
+ Object: "JobsStatistics",
+ Field: field,
+ IsMethod: false,
+ IsResolver: false,
+ Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
+ return nil, errors.New("field of type Int does not have child fields")
+ },
+ }
+ return fc, nil
+}
+
func (ec *executionContext) _JobsStatistics_totalCoreHours(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_JobsStatistics_totalCoreHours(ctx, field)
if err != nil {
@@ -5016,6 +5131,50 @@ func (ec *executionContext) fieldContext_JobsStatistics_totalCoreHours(ctx conte
return fc, nil
}
+func (ec *executionContext) _JobsStatistics_totalAccHours(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) {
+ fc, err := ec.fieldContext_JobsStatistics_totalAccHours(ctx, field)
+ if err != nil {
+ return graphql.Null
+ }
+ ctx = graphql.WithFieldContext(ctx, fc)
+ defer func() {
+ if r := recover(); r != nil {
+ ec.Error(ctx, ec.Recover(ctx, r))
+ ret = graphql.Null
+ }
+ }()
+ resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) {
+ ctx = rctx // use context from middleware stack in children
+ return obj.TotalAccHours, nil
+ })
+ if err != nil {
+ ec.Error(ctx, err)
+ return graphql.Null
+ }
+ if resTmp == nil {
+ if !graphql.HasFieldError(ctx, fc) {
+ ec.Errorf(ctx, "must not be null")
+ }
+ return graphql.Null
+ }
+ res := resTmp.(int)
+ fc.Result = res
+ return ec.marshalNInt2int(ctx, field.Selections, res)
+}
+
+func (ec *executionContext) fieldContext_JobsStatistics_totalAccHours(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
+ fc = &graphql.FieldContext{
+ Object: "JobsStatistics",
+ Field: field,
+ IsMethod: false,
+ IsResolver: false,
+ Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
+ return nil, errors.New("field of type Int does not have child fields")
+ },
+ }
+ return fc, nil
+}
+
func (ec *executionContext) _JobsStatistics_histDuration(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) {
fc, err := ec.fieldContext_JobsStatistics_histDuration(ctx, field)
if err != nil {
@@ -6867,12 +7026,18 @@ func (ec *executionContext) fieldContext_Query_jobsStatistics(ctx context.Contex
return ec.fieldContext_JobsStatistics_name(ctx, field)
case "totalJobs":
return ec.fieldContext_JobsStatistics_totalJobs(ctx, field)
+ case "runningJobs":
+ return ec.fieldContext_JobsStatistics_runningJobs(ctx, field)
case "shortJobs":
return ec.fieldContext_JobsStatistics_shortJobs(ctx, field)
case "totalWalltime":
return ec.fieldContext_JobsStatistics_totalWalltime(ctx, field)
+ case "totalNodeHours":
+ return ec.fieldContext_JobsStatistics_totalNodeHours(ctx, field)
case "totalCoreHours":
return ec.fieldContext_JobsStatistics_totalCoreHours(ctx, field)
+ case "totalAccHours":
+ return ec.fieldContext_JobsStatistics_totalAccHours(ctx, field)
case "histDuration":
return ec.fieldContext_JobsStatistics_histDuration(ctx, field)
case "histNumNodes":
@@ -12062,6 +12227,13 @@ func (ec *executionContext) _JobsStatistics(ctx context.Context, sel ast.Selecti
out.Values[i] = ec._JobsStatistics_totalJobs(ctx, field, obj)
+ if out.Values[i] == graphql.Null {
+ invalids++
+ }
+ case "runningJobs":
+
+ out.Values[i] = ec._JobsStatistics_runningJobs(ctx, field, obj)
+
if out.Values[i] == graphql.Null {
invalids++
}
@@ -12076,6 +12248,13 @@ func (ec *executionContext) _JobsStatistics(ctx context.Context, sel ast.Selecti
out.Values[i] = ec._JobsStatistics_totalWalltime(ctx, field, obj)
+ if out.Values[i] == graphql.Null {
+ invalids++
+ }
+ case "totalNodeHours":
+
+ out.Values[i] = ec._JobsStatistics_totalNodeHours(ctx, field, obj)
+
if out.Values[i] == graphql.Null {
invalids++
}
@@ -12083,6 +12262,13 @@ func (ec *executionContext) _JobsStatistics(ctx context.Context, sel ast.Selecti
out.Values[i] = ec._JobsStatistics_totalCoreHours(ctx, field, obj)
+ if out.Values[i] == graphql.Null {
+ invalids++
+ }
+ case "totalAccHours":
+
+ out.Values[i] = ec._JobsStatistics_totalAccHours(ctx, field, obj)
+
if out.Values[i] == graphql.Null {
invalids++
}
diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go
index 552204e..90a0be2 100644
--- a/internal/graph/model/models_gen.go
+++ b/internal/graph/model/models_gen.go
@@ -90,9 +90,12 @@ type JobsStatistics struct {
ID string `json:"id"`
Name string `json:"name"`
TotalJobs int `json:"totalJobs"`
+ RunningJobs int `json:"runningJobs"`
ShortJobs int `json:"shortJobs"`
TotalWalltime int `json:"totalWalltime"`
+ TotalNodeHours int `json:"totalNodeHours"`
TotalCoreHours int `json:"totalCoreHours"`
+ TotalAccHours int `json:"totalAccHours"`
HistDuration []*HistoPoint `json:"histDuration"`
HistNumNodes []*HistoPoint `json:"histNumNodes"`
}
diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go
index 76fd93e..1828ee4 100644
--- a/internal/graph/schema.resolvers.go
+++ b/internal/graph/schema.resolvers.go
@@ -263,7 +263,51 @@ func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, pag
// JobsStatistics is the resolver for the jobsStatistics field.
func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
- return r.Repo.JobsStatistics(ctx, filter, groupBy)
+ var err error
+ var stats []*model.JobsStatistics
+
+ if requireField(ctx, "totalJobs") {
+ if groupBy == nil {
+ stats, err = r.Repo.JobsStats(ctx, filter)
+ } else {
+ stats, err = r.Repo.JobsStatsGrouped(ctx, filter, groupBy)
+ }
+ } else {
+ stats = make([]*model.JobsStatistics, 0, 1)
+ }
+
+ if groupBy != nil {
+ if requireField(ctx, "shortJobs") {
+ stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "short")
+ }
+ if requireField(ctx, "RunningJobs") {
+ stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "running")
+ }
+ } else {
+ if requireField(ctx, "shortJobs") {
+ stats, err = r.Repo.AddJobCount(ctx, filter, stats, "short")
+ }
+ if requireField(ctx, "RunningJobs") {
+ stats, err = r.Repo.AddJobCount(ctx, filter, stats, "running")
+ }
+ }
+
+ if err != nil {
+ return nil, err
+ }
+
+ if requireField(ctx, "histDuration") || requireField(ctx, "histNumNodes") {
+ if groupBy == nil {
+ stats[0], err = r.Repo.AddHistograms(ctx, filter, stats[0])
+ if err != nil {
+ return nil, err
+ }
+ } else {
+ return nil, errors.New("histograms only implemented without groupBy argument")
+ }
+ }
+
+ return stats, nil
}
// JobsCount is the resolver for the jobsCount field.
diff --git a/internal/graph/stats.go b/internal/graph/util.go
similarity index 95%
rename from internal/graph/stats.go
rename to internal/graph/util.go
index 46aac11..c9423e1 100644
--- a/internal/graph/stats.go
+++ b/internal/graph/util.go
@@ -10,6 +10,7 @@ import (
"fmt"
"math"
+ "github.com/99designs/gqlgen/graphql"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/log"
@@ -132,3 +133,15 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF
Metrics: res,
}, nil
}
+
+func requireField(ctx context.Context, name string) bool {
+ fields := graphql.CollectAllFields(ctx)
+
+ for _, f := range fields {
+ if f == name {
+ return true
+ }
+ }
+
+ return false
+}
diff --git a/internal/repository/dbConnection.go b/internal/repository/dbConnection.go
index 8082bc4..38a258a 100644
--- a/internal/repository/dbConnection.go
+++ b/internal/repository/dbConnection.go
@@ -6,10 +6,10 @@ package repository
import (
"database/sql"
- "log"
"sync"
"time"
+ "github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/jmoiron/sqlx"
"github.com/mattn/go-sqlite3"
"github.com/qustavo/sqlhooks/v2"
@@ -48,15 +48,17 @@ func Connect(driver string, db string) {
switch driver {
case "sqlite3":
- sql.Register("sqlite3WithHooks", sqlhooks.Wrap(&sqlite3.SQLiteDriver{}, &Hooks{}))
-
// - Set WAL mode (not strictly necessary each time because it's persisted in the database, but good for first run)
// - Set busy timeout, so concurrent writers wait on each other instead of erroring immediately
// - Enable foreign key checks
opts.URL += "?_journal=WAL&_timeout=5000&_fk=true"
- dbHandle, err = sqlx.Open("sqlite3WithHooks", opts.URL)
- // dbHandle, err = sqlx.Open("sqlite3", opts.URL)
+ if log.Loglevel() == "debug" {
+ sql.Register("sqlite3WithHooks", sqlhooks.Wrap(&sqlite3.SQLiteDriver{}, &Hooks{}))
+ dbHandle, err = sqlx.Open("sqlite3WithHooks", opts.URL)
+ } else {
+ dbHandle, err = sqlx.Open("sqlite3", opts.URL)
+ }
if err != nil {
log.Fatal(err)
}
diff --git a/internal/repository/job.go b/internal/repository/job.go
index 9f2397a..ece960a 100644
--- a/internal/repository/job.go
+++ b/internal/repository/job.go
@@ -14,12 +14,9 @@ import (
"sync"
"time"
- "github.com/99designs/gqlgen/graphql"
"github.com/ClusterCockpit/cc-backend/internal/auth"
- "github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
- "github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/lrucache"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
@@ -747,242 +744,6 @@ func (r *JobRepository) FindJobsBefore(startTime int64) ([]*schema.Job, error) {
return jobs, nil
}
-// GraphQL validation should make sure that no unkown values can be specified.
-var groupBy2column = map[model.Aggregate]string{
- model.AggregateUser: "job.user",
- model.AggregateProject: "job.project",
- model.AggregateCluster: "job.cluster",
-}
-
-// Helper function for the jobsStatistics GraphQL query placed here so that schema.resolvers.go is not too full.
-func (r *JobRepository) JobsStatistics(ctx context.Context,
- filter []*model.JobFilter,
- groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
-
- start := time.Now()
- // In case `groupBy` is nil (not used), the model.JobsStatistics used is at the key '' (empty string)
- stats := map[string]*model.JobsStatistics{}
- var castType string
-
- switch r.driver {
- case "sqlite3":
- castType = "int"
- case "mysql":
- castType = "unsigned"
- }
-
- // `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those.
- for _, cluster := range archive.Clusters {
- for _, subcluster := range cluster.SubClusters {
- corehoursCol := fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as %s)", subcluster.SocketsPerNode, subcluster.CoresPerSocket, castType)
- var rawQuery sq.SelectBuilder
- if groupBy == nil {
- rawQuery = sq.Select(
- "''",
- "COUNT(job.id)",
- fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType),
- corehoursCol,
- ).From("job")
- } else {
- col := groupBy2column[*groupBy]
- rawQuery = sq.Select(
- col,
- "COUNT(job.id)",
- fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType),
- corehoursCol,
- ).From("job").GroupBy(col)
- }
-
- rawQuery = rawQuery.
- Where("job.cluster = ?", cluster.Name).
- Where("job.subcluster = ?", subcluster.Name)
-
- query, qerr := SecurityCheck(ctx, rawQuery)
-
- if qerr != nil {
- return nil, qerr
- }
-
- for _, f := range filter {
- query = BuildWhereClause(f, query)
- }
-
- rows, err := query.RunWith(r.DB).Query()
- if err != nil {
- log.Warn("Error while querying DB for job statistics")
- return nil, err
- }
-
- for rows.Next() {
- var id sql.NullString
- var jobs, walltime, corehours sql.NullInt64
- if err := rows.Scan(&id, &jobs, &walltime, &corehours); err != nil {
- log.Warn("Error while scanning rows")
- return nil, err
- }
-
- if id.Valid {
- if s, ok := stats[id.String]; ok {
- s.TotalJobs += int(jobs.Int64)
- s.TotalWalltime += int(walltime.Int64)
- s.TotalCoreHours += int(corehours.Int64)
- } else {
- stats[id.String] = &model.JobsStatistics{
- ID: id.String,
- TotalJobs: int(jobs.Int64),
- TotalWalltime: int(walltime.Int64),
- TotalCoreHours: int(corehours.Int64),
- }
- }
- }
- }
-
- }
- }
-
- if groupBy == nil {
-
- query := sq.Select("COUNT(job.id)").From("job").Where("job.duration < ?", config.Keys.ShortRunningJobsDuration)
- query, qerr := SecurityCheck(ctx, query)
-
- if qerr != nil {
- return nil, qerr
- }
-
- for _, f := range filter {
- query = BuildWhereClause(f, query)
- }
- if err := query.RunWith(r.DB).QueryRow().Scan(&(stats[""].ShortJobs)); err != nil {
- log.Warn("Error while scanning rows for short job stats")
- return nil, err
- }
- } else {
- col := groupBy2column[*groupBy]
-
- query := sq.Select(col, "COUNT(job.id)").From("job").Where("job.duration < ?", config.Keys.ShortRunningJobsDuration)
-
- query, qerr := SecurityCheck(ctx, query)
-
- if qerr != nil {
- return nil, qerr
- }
-
- for _, f := range filter {
- query = BuildWhereClause(f, query)
- }
- rows, err := query.RunWith(r.DB).Query()
- if err != nil {
- log.Warn("Error while querying jobs for short jobs")
- return nil, err
- }
-
- for rows.Next() {
- var id sql.NullString
- var shortJobs sql.NullInt64
- if err := rows.Scan(&id, &shortJobs); err != nil {
- log.Warn("Error while scanning rows for short jobs")
- return nil, err
- }
-
- if id.Valid {
- stats[id.String].ShortJobs = int(shortJobs.Int64)
- }
- }
-
- if col == "job.user" {
- for id := range stats {
- emptyDash := "-"
- user := auth.GetUser(ctx)
- name, _ := r.FindColumnValue(user, id, "user", "name", "username", false)
- if name != "" {
- stats[id].Name = name
- } else {
- stats[id].Name = emptyDash
- }
- }
- }
- }
-
- // Calculating the histogram data is expensive, so only do it if needed.
- // An explicit resolver can not be used because we need to know the filters.
- histogramsNeeded := false
- fields := graphql.CollectFieldsCtx(ctx, nil)
- for _, col := range fields {
- if col.Name == "histDuration" || col.Name == "histNumNodes" {
- histogramsNeeded = true
- }
- }
-
- res := make([]*model.JobsStatistics, 0, len(stats))
- for _, stat := range stats {
- res = append(res, stat)
- id, col := "", ""
- if groupBy != nil {
- id = stat.ID
- col = groupBy2column[*groupBy]
- }
-
- if histogramsNeeded {
- var err error
- value := fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType)
- stat.HistDuration, err = r.jobsStatisticsHistogram(ctx, value, filter, id, col)
- if err != nil {
- log.Warn("Error while loading job statistics histogram: running jobs")
- return nil, err
- }
-
- stat.HistNumNodes, err = r.jobsStatisticsHistogram(ctx, "job.num_nodes as value", filter, id, col)
- if err != nil {
- log.Warn("Error while loading job statistics histogram: num nodes")
- return nil, err
- }
- }
- }
-
- log.Infof("Timer JobStatistics %s", time.Since(start))
- return res, nil
-}
-
-// `value` must be the column grouped by, but renamed to "value". `id` and `col` can optionally be used
-// to add a condition to the query of the kind "
= ".
-func (r *JobRepository) jobsStatisticsHistogram(ctx context.Context,
- value string, filters []*model.JobFilter, id, col string) ([]*model.HistoPoint, error) {
-
- start := time.Now()
- query, qerr := SecurityCheck(ctx, sq.Select(value, "COUNT(job.id) AS count").From("job"))
-
- if qerr != nil {
- return nil, qerr
- }
-
- for _, f := range filters {
- query = BuildWhereClause(f, query)
- }
-
- if len(id) != 0 && len(col) != 0 {
- query = query.Where(col+" = ?", id)
- }
-
- rows, err := query.GroupBy("value").RunWith(r.DB).Query()
- if err != nil {
- log.Error("Error while running query")
- return nil, err
- }
-
- points := make([]*model.HistoPoint, 0)
- for rows.Next() {
- point := model.HistoPoint{}
- if err := rows.Scan(&point.Value, &point.Count); err != nil {
- log.Warn("Error while scanning rows")
- return nil, err
- }
-
- points = append(points, &point)
- }
- log.Infof("Timer jobsStatisticsHistogram %s", time.Since(start))
- return points, nil
-}
-
const NamedJobInsert string = `INSERT INTO job (
job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc,
exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data,
@@ -1007,96 +768,3 @@ func (r *JobRepository) InsertJob(job *schema.Job) (int64, error) {
return id, nil
}
-
-type Transaction struct {
- tx *sqlx.Tx
- stmt *sqlx.NamedStmt
-}
-
-func (r *JobRepository) TransactionInit() (*Transaction, error) {
- var err error
- t := new(Transaction)
- // Inserts are bundled into transactions because in sqlite,
- // that speeds up inserts A LOT.
- t.tx, err = r.DB.Beginx()
- if err != nil {
- log.Warn("Error while bundling transactions")
- return nil, err
- }
-
- t.stmt, err = t.tx.PrepareNamed(NamedJobInsert)
- if err != nil {
- log.Warn("Error while preparing namedJobInsert")
- return nil, err
- }
-
- return t, nil
-}
-
-func (r *JobRepository) TransactionCommit(t *Transaction) error {
- var err error
- if t.tx != nil {
- if err = t.tx.Commit(); err != nil {
- log.Warn("Error while committing transactions")
- return err
- }
- }
-
- t.tx, err = r.DB.Beginx()
- if err != nil {
- log.Warn("Error while bundling transactions")
- return err
- }
-
- t.stmt = t.tx.NamedStmt(t.stmt)
- return nil
-}
-
-func (r *JobRepository) TransactionEnd(t *Transaction) error {
- if err := t.tx.Commit(); err != nil {
- log.Warn("Error while committing SQL transactions")
- return err
- }
-
- return nil
-}
-
-func (r *JobRepository) TransactionAdd(t *Transaction, job schema.Job) (int64, error) {
- res, err := t.stmt.Exec(job)
- if err != nil {
- log.Errorf("repository initDB(): %v", err)
- return 0, err
- }
-
- id, err := res.LastInsertId()
- if err != nil {
- log.Errorf("repository initDB(): %v", err)
- return 0, err
- }
-
- return id, nil
-}
-
-func (r *JobRepository) TransactionAddTag(t *Transaction, tag *schema.Tag) (int64, error) {
- res, err := t.tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type)
- if err != nil {
- log.Errorf("Error while inserting tag into tag table: %v (Type %v)", tag.Name, tag.Type)
- return 0, err
- }
- tagId, err := res.LastInsertId()
- if err != nil {
- log.Warn("Error while getting last insert ID")
- return 0, err
- }
-
- return tagId, nil
-}
-
-func (r *JobRepository) TransactionSetTag(t *Transaction, jobId int64, tagId int64) error {
- if _, err := t.tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, jobId, tagId); err != nil {
- log.Errorf("Error while inserting jobtag into jobtag table: %v (TagID %v)", jobId, tagId)
- return err
- }
-
- return nil
-}
diff --git a/internal/repository/stats.go b/internal/repository/stats.go
new file mode 100644
index 0000000..d96c20a
--- /dev/null
+++ b/internal/repository/stats.go
@@ -0,0 +1,411 @@
+// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
+// All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+package repository
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "time"
+
+ "github.com/ClusterCockpit/cc-backend/internal/auth"
+ "github.com/ClusterCockpit/cc-backend/internal/config"
+ "github.com/ClusterCockpit/cc-backend/internal/graph/model"
+ "github.com/ClusterCockpit/cc-backend/pkg/log"
+ sq "github.com/Masterminds/squirrel"
+)
+
+// GraphQL validation should make sure that no unkown values can be specified.
+var groupBy2column = map[model.Aggregate]string{
+ model.AggregateUser: "job.user",
+ model.AggregateProject: "job.project",
+ model.AggregateCluster: "job.cluster",
+}
+
+func (r *JobRepository) buildCountQuery(
+ filter []*model.JobFilter,
+ kind string,
+ col string) sq.SelectBuilder {
+
+ var query sq.SelectBuilder
+
+ if col != "" {
+ // Scan columns: id, cnt
+ query = sq.Select(col, "COUNT(job.id)").From("job").GroupBy(col)
+ } else {
+ // Scan columns: cnt
+ query = sq.Select("COUNT(job.id)").From("job")
+ }
+
+ switch kind {
+ case "running":
+ query = query.Where("job.job_state = ?", "running")
+ case "short":
+ query = query.Where("job.duration < ?", config.Keys.ShortRunningJobsDuration)
+ }
+
+ for _, f := range filter {
+ query = BuildWhereClause(f, query)
+ }
+
+ return query
+}
+
+func (r *JobRepository) buildStatsQuery(
+ filter []*model.JobFilter,
+ col string) sq.SelectBuilder {
+
+ var query sq.SelectBuilder
+ castType := r.getCastType()
+
+ if col != "" {
+ // Scan columns: id, totalJobs, totalWalltime, totalNodeHours, totalCoreHours, totalAccHours
+ query = sq.Select(col, "COUNT(job.id)",
+ fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType),
+ fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s)", castType),
+ fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s)", castType),
+ fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_acc) / 3600) as %s)", castType),
+ ).From("job").GroupBy(col)
+ } else {
+ // Scan columns: totalJobs, totalWalltime, totalNodeHours, totalCoreHours, totalAccHours
+ query = sq.Select("COUNT(job.id)",
+ fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType),
+ fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s)", castType),
+ fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s)", castType),
+ fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_acc) / 3600) as %s)", castType),
+ ).From("job")
+ }
+
+ for _, f := range filter {
+ query = BuildWhereClause(f, query)
+ }
+
+ return query
+}
+
+func (r *JobRepository) getUserName(ctx context.Context, id string) string {
+ user := auth.GetUser(ctx)
+ name, _ := r.FindColumnValue(user, id, "user", "name", "username", false)
+ if name != "" {
+ return name
+ } else {
+ return "-"
+ }
+}
+
+func (r *JobRepository) getCastType() string {
+ var castType string
+
+ switch r.driver {
+ case "sqlite3":
+ castType = "int"
+ case "mysql":
+ castType = "unsigned"
+ default:
+ castType = ""
+ }
+
+ return castType
+}
+
+func (r *JobRepository) JobsStatsGrouped(
+ ctx context.Context,
+ filter []*model.JobFilter,
+ groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
+
+ start := time.Now()
+ col := groupBy2column[*groupBy]
+ query := r.buildStatsQuery(filter, col)
+ query, err := SecurityCheck(ctx, query)
+ if err != nil {
+ return nil, err
+ }
+
+ rows, err := query.RunWith(r.DB).Query()
+ if err != nil {
+ log.Warn("Error while querying DB for job statistics")
+ return nil, err
+ }
+
+ stats := make([]*model.JobsStatistics, 0, 100)
+
+ for rows.Next() {
+ var id sql.NullString
+ var jobs, walltime, nodeHours, coreHours, accHours sql.NullInt64
+ if err := rows.Scan(&id, &jobs, &walltime, &nodeHours, &coreHours, &accHours); err != nil {
+ log.Warn("Error while scanning rows")
+ return nil, err
+ }
+
+ if id.Valid {
+ var totalCoreHours, totalAccHours int
+
+ if coreHours.Valid {
+ totalCoreHours = int(coreHours.Int64)
+ }
+ if accHours.Valid {
+ totalAccHours = int(accHours.Int64)
+ }
+
+ if col == "job.user" {
+ name := r.getUserName(ctx, id.String)
+ stats = append(stats,
+ &model.JobsStatistics{
+ ID: id.String,
+ Name: name,
+ TotalJobs: int(jobs.Int64),
+ TotalWalltime: int(walltime.Int64),
+ TotalCoreHours: totalCoreHours,
+ TotalAccHours: totalAccHours})
+ } else {
+ stats = append(stats,
+ &model.JobsStatistics{
+ ID: id.String,
+ TotalJobs: int(jobs.Int64),
+ TotalWalltime: int(walltime.Int64),
+ TotalCoreHours: totalCoreHours,
+ TotalAccHours: totalAccHours})
+ }
+ }
+ }
+
+ log.Infof("Timer JobsStatsGrouped %s", time.Since(start))
+ return stats, nil
+}
+
+func (r *JobRepository) JobsStats(
+ ctx context.Context,
+ filter []*model.JobFilter) ([]*model.JobsStatistics, error) {
+
+ start := time.Now()
+ query := r.buildStatsQuery(filter, "")
+ query, err := SecurityCheck(ctx, query)
+ if err != nil {
+ return nil, err
+ }
+
+ row := query.RunWith(r.DB).QueryRow()
+ stats := make([]*model.JobsStatistics, 0, 1)
+
+ var jobs, walltime, nodeHours, coreHours, accHours sql.NullInt64
+ if err := row.Scan(&jobs, &walltime, &nodeHours, &coreHours, &accHours); err != nil {
+ log.Warn("Error while scanning rows")
+ return nil, err
+ }
+
+ if jobs.Valid {
+ var totalCoreHours, totalAccHours int
+
+ if coreHours.Valid {
+ totalCoreHours = int(coreHours.Int64)
+ }
+ if accHours.Valid {
+ totalAccHours = int(accHours.Int64)
+ }
+ stats = append(stats,
+ &model.JobsStatistics{
+ TotalJobs: int(jobs.Int64),
+ TotalWalltime: int(walltime.Int64),
+ TotalCoreHours: totalCoreHours,
+ TotalAccHours: totalAccHours})
+ }
+
+ log.Infof("Timer JobStats %s", time.Since(start))
+ return stats, nil
+}
+
+func (r *JobRepository) JobCountGrouped(
+ ctx context.Context,
+ filter []*model.JobFilter,
+ groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
+
+ start := time.Now()
+ col := groupBy2column[*groupBy]
+ query := r.buildCountQuery(filter, "", col)
+ query, err := SecurityCheck(ctx, query)
+ if err != nil {
+ return nil, err
+ }
+ rows, err := query.RunWith(r.DB).Query()
+ if err != nil {
+ log.Warn("Error while querying DB for job statistics")
+ return nil, err
+ }
+
+ stats := make([]*model.JobsStatistics, 0, 100)
+
+ for rows.Next() {
+ var id sql.NullString
+ var cnt sql.NullInt64
+ if err := rows.Scan(&id, &cnt); err != nil {
+ log.Warn("Error while scanning rows")
+ return nil, err
+ }
+ if id.Valid {
+ stats = append(stats,
+ &model.JobsStatistics{
+ ID: id.String,
+ TotalJobs: int(cnt.Int64)})
+ }
+ }
+
+ log.Infof("Timer JobCountGrouped %s", time.Since(start))
+ return stats, nil
+}
+
+func (r *JobRepository) AddJobCountGrouped(
+ ctx context.Context,
+ filter []*model.JobFilter,
+ groupBy *model.Aggregate,
+ stats []*model.JobsStatistics,
+ kind string) ([]*model.JobsStatistics, error) {
+
+ start := time.Now()
+ col := groupBy2column[*groupBy]
+ query := r.buildCountQuery(filter, kind, col)
+ query, err := SecurityCheck(ctx, query)
+ if err != nil {
+ return nil, err
+ }
+ rows, err := query.RunWith(r.DB).Query()
+ if err != nil {
+ log.Warn("Error while querying DB for job statistics")
+ return nil, err
+ }
+
+ counts := make(map[string]int)
+
+ for rows.Next() {
+ var id sql.NullString
+ var cnt sql.NullInt64
+ if err := rows.Scan(&id, &cnt); err != nil {
+ log.Warn("Error while scanning rows")
+ return nil, err
+ }
+ if id.Valid {
+ counts[id.String] = int(cnt.Int64)
+ }
+ }
+
+ switch kind {
+ case "running":
+ for _, s := range stats {
+ s.RunningJobs = counts[s.ID]
+ }
+ case "short":
+ for _, s := range stats {
+ s.ShortJobs = counts[s.ID]
+ }
+ }
+
+ log.Infof("Timer AddJobCountGrouped %s", time.Since(start))
+ return stats, nil
+}
+
+func (r *JobRepository) AddJobCount(
+ ctx context.Context,
+ filter []*model.JobFilter,
+ stats []*model.JobsStatistics,
+ kind string) ([]*model.JobsStatistics, error) {
+
+ start := time.Now()
+ query := r.buildCountQuery(filter, kind, "")
+ query, err := SecurityCheck(ctx, query)
+ if err != nil {
+ return nil, err
+ }
+ rows, err := query.RunWith(r.DB).Query()
+ if err != nil {
+ log.Warn("Error while querying DB for job statistics")
+ return nil, err
+ }
+
+ counts := make(map[string]int)
+
+ for rows.Next() {
+ var cnt sql.NullInt64
+ if err := rows.Scan(&cnt); err != nil {
+ log.Warn("Error while scanning rows")
+ return nil, err
+ }
+ }
+
+ switch kind {
+ case "running":
+ for _, s := range stats {
+ s.RunningJobs = counts[s.ID]
+ }
+ case "short":
+ for _, s := range stats {
+ s.ShortJobs = counts[s.ID]
+ }
+ }
+
+ log.Infof("Timer JobJobCount %s", time.Since(start))
+ return stats, nil
+}
+
+func (r *JobRepository) AddHistograms(
+ ctx context.Context,
+ filter []*model.JobFilter,
+ stat *model.JobsStatistics) (*model.JobsStatistics, error) {
+ start := time.Now()
+
+ castType := r.getCastType()
+ var err error
+ value := fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType)
+ stat.HistDuration, err = r.jobsStatisticsHistogram(ctx, value, filter)
+ if err != nil {
+ log.Warn("Error while loading job statistics histogram: running jobs")
+ return nil, err
+ }
+
+ stat.HistNumNodes, err = r.jobsStatisticsHistogram(ctx, "job.num_nodes as value", filter)
+ if err != nil {
+ log.Warn("Error while loading job statistics histogram: num nodes")
+ return nil, err
+ }
+
+ log.Infof("Timer AddHistograms %s", time.Since(start))
+ return stat, nil
+}
+
+// `value` must be the column grouped by, but renamed to "value"
+func (r *JobRepository) jobsStatisticsHistogram(
+ ctx context.Context,
+ value string,
+ filters []*model.JobFilter) ([]*model.HistoPoint, error) {
+
+ start := time.Now()
+ query, qerr := SecurityCheck(ctx,
+ sq.Select(value, "COUNT(job.id) AS count").From("job"))
+
+ if qerr != nil {
+ return nil, qerr
+ }
+
+ for _, f := range filters {
+ query = BuildWhereClause(f, query)
+ }
+
+ rows, err := query.GroupBy("value").RunWith(r.DB).Query()
+ if err != nil {
+ log.Error("Error while running query")
+ return nil, err
+ }
+
+ points := make([]*model.HistoPoint, 0)
+ for rows.Next() {
+ point := model.HistoPoint{}
+ if err := rows.Scan(&point.Value, &point.Count); err != nil {
+ log.Warn("Error while scanning rows")
+ return nil, err
+ }
+
+ points = append(points, &point)
+ }
+ log.Infof("Timer jobsStatisticsHistogram %s", time.Since(start))
+ return points, nil
+}
diff --git a/internal/repository/stats_test.go b/internal/repository/stats_test.go
new file mode 100644
index 0000000..b1a815e
--- /dev/null
+++ b/internal/repository/stats_test.go
@@ -0,0 +1,21 @@
+// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
+// All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+package repository
+
+import (
+ "fmt"
+ "testing"
+)
+
+func TestBuildJobStatsQuery(t *testing.T) {
+ r := setup(t)
+ q := r.buildStatsQuery(nil, "USER")
+
+ sql, _, err := q.ToSql()
+ noErr(t, err)
+
+ fmt.Printf("SQL: %s\n", sql)
+
+}
diff --git a/internal/repository/transaction.go b/internal/repository/transaction.go
new file mode 100644
index 0000000..4d003d7
--- /dev/null
+++ b/internal/repository/transaction.go
@@ -0,0 +1,104 @@
+// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg.
+// All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+package repository
+
+import (
+ "github.com/ClusterCockpit/cc-backend/pkg/log"
+ "github.com/ClusterCockpit/cc-backend/pkg/schema"
+ "github.com/jmoiron/sqlx"
+)
+
+type Transaction struct {
+ tx *sqlx.Tx
+ stmt *sqlx.NamedStmt
+}
+
+func (r *JobRepository) TransactionInit() (*Transaction, error) {
+ var err error
+ t := new(Transaction)
+ // Inserts are bundled into transactions because in sqlite,
+ // that speeds up inserts A LOT.
+ t.tx, err = r.DB.Beginx()
+ if err != nil {
+ log.Warn("Error while bundling transactions")
+ return nil, err
+ }
+
+ t.stmt, err = t.tx.PrepareNamed(NamedJobInsert)
+ if err != nil {
+ log.Warn("Error while preparing namedJobInsert")
+ return nil, err
+ }
+
+ return t, nil
+}
+
+func (r *JobRepository) TransactionCommit(t *Transaction) error {
+ var err error
+ if t.tx != nil {
+ if err = t.tx.Commit(); err != nil {
+ log.Warn("Error while committing transactions")
+ return err
+ }
+ }
+
+ t.tx, err = r.DB.Beginx()
+ if err != nil {
+ log.Warn("Error while bundling transactions")
+ return err
+ }
+
+ t.stmt = t.tx.NamedStmt(t.stmt)
+ return nil
+}
+
+func (r *JobRepository) TransactionEnd(t *Transaction) error {
+ if err := t.tx.Commit(); err != nil {
+ log.Warn("Error while committing SQL transactions")
+ return err
+ }
+
+ return nil
+}
+
+func (r *JobRepository) TransactionAdd(t *Transaction, job schema.Job) (int64, error) {
+ res, err := t.stmt.Exec(job)
+ if err != nil {
+ log.Errorf("repository initDB(): %v", err)
+ return 0, err
+ }
+
+ id, err := res.LastInsertId()
+ if err != nil {
+ log.Errorf("repository initDB(): %v", err)
+ return 0, err
+ }
+
+ return id, nil
+}
+
+func (r *JobRepository) TransactionAddTag(t *Transaction, tag *schema.Tag) (int64, error) {
+ res, err := t.tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type)
+ if err != nil {
+ log.Errorf("Error while inserting tag into tag table: %v (Type %v)", tag.Name, tag.Type)
+ return 0, err
+ }
+ tagId, err := res.LastInsertId()
+ if err != nil {
+ log.Warn("Error while getting last insert ID")
+ return 0, err
+ }
+
+ return tagId, nil
+}
+
+func (r *JobRepository) TransactionSetTag(t *Transaction, jobId int64, tagId int64) error {
+ if _, err := t.tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, jobId, tagId); err != nil {
+ log.Errorf("Error while inserting jobtag into jobtag table: %v (TagID %v)", jobId, tagId)
+ return err
+ }
+
+ return nil
+}
diff --git a/internal/routerConfig/routes.go b/internal/routerConfig/routes.go
index 87d2c0e..2aa3f05 100644
--- a/internal/routerConfig/routes.go
+++ b/internal/routerConfig/routes.go
@@ -14,12 +14,9 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/api"
"github.com/ClusterCockpit/cc-backend/internal/auth"
- "github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/repository"
- "github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/log"
- "github.com/ClusterCockpit/cc-backend/pkg/schema"
"github.com/ClusterCockpit/cc-backend/web"
"github.com/gorilla/mux"
)
@@ -50,47 +47,20 @@ var routes []Route = []Route{
}
func setupHomeRoute(i InfoType, r *http.Request) InfoType {
- type cluster struct {
- Name string
- RunningJobs int
- TotalJobs int
- RecentShortJobs int
- }
jobRepo := repository.GetJobRepository()
+ groupBy := model.AggregateCluster
- runningJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{
- State: []schema.JobState{schema.JobStateRunning},
- }}, nil, nil)
+ stats, err := jobRepo.JobCountGrouped(r.Context(), nil, &groupBy)
if err != nil {
log.Warnf("failed to count jobs: %s", err.Error())
- runningJobs = map[string]int{}
- }
- totalJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, nil, nil, nil)
- if err != nil {
- log.Warnf("failed to count jobs: %s", err.Error())
- totalJobs = map[string]int{}
- }
- from := time.Now().Add(-24 * time.Hour)
- recentShortJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{
- StartTime: &schema.TimeRange{From: &from, To: nil},
- Duration: &schema.IntRange{From: 0, To: config.Keys.ShortRunningJobsDuration},
- }}, nil, nil)
- if err != nil {
- log.Warnf("failed to count jobs: %s", err.Error())
- recentShortJobs = map[string]int{}
}
- clusters := make([]cluster, 0)
- for _, c := range archive.Clusters {
- clusters = append(clusters, cluster{
- Name: c.Name,
- RunningJobs: runningJobs[c.Name],
- TotalJobs: totalJobs[c.Name],
- RecentShortJobs: recentShortJobs[c.Name],
- })
+ stats, err = jobRepo.AddJobCountGrouped(r.Context(), nil, &groupBy, stats, "running")
+ if err != nil {
+ log.Warnf("failed to count running jobs: %s", err.Error())
}
- i["clusters"] = clusters
+ i["clusters"] = stats
return i
}
diff --git a/pkg/log/log.go b/pkg/log/log.go
index f514df9..7e89753 100644
--- a/pkg/log/log.go
+++ b/pkg/log/log.go
@@ -41,6 +41,8 @@ var (
CritLog *log.Logger = log.New(CritWriter, CritPrefix, log.LstdFlags|log.Llongfile)
)
+var loglevel string = "info"
+
/* CONFIG */
func Init(lvl string, logdate bool) {
@@ -78,6 +80,8 @@ func Init(lvl string, logdate bool) {
ErrLog = log.New(ErrWriter, ErrPrefix, log.LstdFlags|log.Llongfile)
CritLog = log.New(CritWriter, CritPrefix, log.LstdFlags|log.Llongfile)
}
+
+ loglevel = lvl
}
/* PRINT */
@@ -170,6 +174,10 @@ func Fatalf(format string, v ...interface{}) {
os.Exit(1)
}
+func Loglevel() string {
+ return loglevel
+}
+
/* SPECIAL */
// func Finfof(w io.Writer, format string, v ...interface{}) {
diff --git a/pkg/schema/job.go b/pkg/schema/job.go
index 3f75551..d967dd0 100644
--- a/pkg/schema/job.go
+++ b/pkg/schema/job.go
@@ -17,14 +17,15 @@ import (
type BaseJob struct {
// The unique identifier of a job
- JobID int64 `json:"jobId" db:"job_id" example:"123000"`
- User string `json:"user" db:"user" example:"abcd100h"` // The unique identifier of a user
- Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project
- Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster
- SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster
- Partition string `json:"partition,omitempty" db:"partition" example:"main"` // The Slurm partition to which the job was submitted
- ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"` // The unique identifier of an array job
- NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0)
+ JobID int64 `json:"jobId" db:"job_id" example:"123000"`
+ User string `json:"user" db:"user" example:"abcd100h"` // The unique identifier of a user
+ Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project
+ Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster
+ SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster
+ Partition string `json:"partition,omitempty" db:"partition" example:"main"` // The Slurm partition to which the job was submitted
+ ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"` // The unique identifier of an array job
+ NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0)
+ // NumCores int32 `json:"numCores" db:"num_cores" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0)
NumHWThreads int32 `json:"numHwthreads,omitempty" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0)
NumAcc int32 `json:"numAcc,omitempty" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0)
Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user
diff --git a/web/frontend/src/Analysis.root.svelte b/web/frontend/src/Analysis.root.svelte
index b0dd704..42100cf 100644
--- a/web/frontend/src/Analysis.root.svelte
+++ b/web/frontend/src/Analysis.root.svelte
@@ -65,7 +65,7 @@
histNumNodes { count, value }
}
- topUsers: jobsCount(filter: $filters, groupBy: USER, weight: NODE_HOURS, limit: 5) { name, count }
+ topUsers: jobsCount(filter: $jobFilters, groupBy: USER, weight: NODE_HOURS, limit: 5) { name, count }
}
`,
variables: { jobFilters }
@@ -143,7 +143,7 @@
{$statsQuery.data.stats[0].totalJobs} |
- Short Jobs (< 2m) |
+ Short Jobs |
{$statsQuery.data.stats[0].shortJobs} |
@@ -160,7 +160,7 @@
{#key $statsQuery.data.topUsers}
Top Users (by node hours)
b.count - a.count).map(({ count }, idx) => ({ count, value: idx }))}
label={(x) => x < $statsQuery.data.topUsers.length ? $statsQuery.data.topUsers[Math.floor(x)].name : 'No Users'}
ylabel="Node Hours [h]"/>
diff --git a/web/frontend/src/List.root.svelte b/web/frontend/src/List.root.svelte
index b219f35..814a7df 100644
--- a/web/frontend/src/List.root.svelte
+++ b/web/frontend/src/List.root.svelte
@@ -45,6 +45,7 @@
totalJobs
totalWalltime
totalCoreHours
+ totalAccHours
}
}`,
variables: { jobFilters }
@@ -166,6 +167,18 @@
+
+ Total Accelerator Hours
+
+ |
@@ -205,6 +218,7 @@
{row.totalJobs} |
{row.totalWalltime} |
{row.totalCoreHours} |
+ {row.totalAccHours} |
{:else}
diff --git a/web/frontend/src/plots/Histogram.svelte b/web/frontend/src/plots/Histogram.svelte
index aebc8f3..eaaf49c 100644
--- a/web/frontend/src/plots/Histogram.svelte
+++ b/web/frontend/src/plots/Histogram.svelte
@@ -24,6 +24,7 @@
export let ylabel = ''
export let min = null
export let max = null
+ export let small = false
export let label = formatNumber
const fontSize = 12
@@ -108,7 +109,7 @@
} else {
const stepsizeX = getStepSize(maxValue, w, 120)
for (let x = 0; x <= maxValue; x += stepsizeX) {
- ctx.fillText(label(x), getCanvasX(x), height - paddingBottom - Math.floor(labelOffset / 2))
+ ctx.fillText(label(x), getCanvasX(x), height - paddingBottom - Math.floor(labelOffset / (small ? 8 : 2)))
}
}
diff --git a/web/templates/home.tmpl b/web/templates/home.tmpl
index 92c80f9..ff49e1e 100644
--- a/web/templates/home.tmpl
+++ b/web/templates/home.tmpl
@@ -6,9 +6,8 @@
Name |
- Running Jobs (short ones not listed) |
+ Running Jobs |
Total Jobs |
- Short Jobs in past 24h |
{{if .User.HasRole .Roles.admin}}
System View |
Analysis View |
@@ -19,21 +18,19 @@
{{if .User.HasRole .Roles.admin}}
{{range .Infos.clusters}}
- {{.Name}} |
- {{.RunningJobs}} jobs |
- {{.TotalJobs}} jobs |
- {{.RecentShortJobs}} |
- System View |
- Analysis View |
+ {{.ID}} |
+ {{.RunningJobs}} jobs |
+ {{.TotalJobs}} jobs |
+ System View |
+ Analysis View |
{{end}}
{{else}}
{{range .Infos.clusters}}
- {{.Name}} |
- {{.RunningJobs}} jobs |
- {{.TotalJobs}} jobs |
- {{.RecentShortJobs}} |
+ {{.ID}} |
+ {{.RunningJobs}} jobs |
+ {{.TotalJobs}} jobs |
{{end}}
{{end}}