Merge pull request #205 from ClusterCockpit/166_add_scopes_analysis

166 add scopes analysis
This commit is contained in:
Jan Eitzinger
2023-08-31 12:03:37 +02:00
committed by GitHub
17 changed files with 1295 additions and 534 deletions

View File

@@ -26,21 +26,25 @@ var Keys schema.ProgramConfig = schema.ProgramConfig{
StopJobsExceedingWalltime: 0,
ShortRunningJobsDuration: 5 * 60,
UiDefaults: map[string]interface{}{
"analysis_view_histogramMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"analysis_view_scatterPlotMetrics": [][]string{{"flops_any", "mem_bw"}, {"flops_any", "cpu_load"}, {"cpu_load", "mem_bw"}},
"job_view_nodestats_selectedMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"job_view_polarPlotMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"job_view_selectedMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"plot_general_colorBackground": true,
"plot_general_colorscheme": []string{"#00bfff", "#0000ff", "#ff00ff", "#ff0000", "#ff8000", "#ffff00", "#80ff00"},
"plot_general_lineWidth": 3,
"plot_list_jobsPerPage": 50,
"plot_list_selectedMetrics": []string{"cpu_load", "mem_used", "flops_any", "mem_bw"},
"plot_view_plotsPerRow": 3,
"plot_view_showPolarplot": true,
"plot_view_showRoofline": true,
"plot_view_showStatTable": true,
"system_view_selectedMetric": "cpu_load",
"analysis_view_histogramMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"analysis_view_scatterPlotMetrics": [][]string{{"flops_any", "mem_bw"}, {"flops_any", "cpu_load"}, {"cpu_load", "mem_bw"}},
"job_view_nodestats_selectedMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"job_view_polarPlotMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"job_view_selectedMetrics": []string{"flops_any", "mem_bw", "mem_used"},
"plot_general_colorBackground": true,
"plot_general_colorscheme": []string{"#00bfff", "#0000ff", "#ff00ff", "#ff0000", "#ff8000", "#ffff00", "#80ff00"},
"plot_general_lineWidth": 3,
"plot_list_jobsPerPage": 50,
"plot_list_selectedMetrics": []string{"cpu_load", "mem_used", "flops_any", "mem_bw"},
"plot_view_plotsPerRow": 3,
"plot_view_showPolarplot": true,
"plot_view_showRoofline": true,
"plot_view_showStatTable": true,
"system_view_selectedMetric": "cpu_load",
"analysis_view_selectedTopEntity": "user",
"analysis_view_selectedTopCategory": "totalWalltime",
"status_view_selectedTopUserCategory": "totalJobs",
"status_view_selectedTopProjectCategory": "totalJobs",
},
}

File diff suppressed because it is too large Load Diff

View File

@@ -22,8 +22,8 @@ type FloatRange struct {
}
type Footprints struct {
Nodehours []schema.Float `json:"nodehours"`
Metrics []*MetricFootprints `json:"metrics"`
TimeWeights *TimeWeights `json:"timeWeights"`
Metrics []*MetricFootprints `json:"metrics"`
}
type HistoPoint struct {
@@ -91,11 +91,16 @@ type JobsStatistics struct {
RunningJobs int `json:"runningJobs"`
ShortJobs int `json:"shortJobs"`
TotalWalltime int `json:"totalWalltime"`
TotalNodes int `json:"totalNodes"`
TotalNodeHours int `json:"totalNodeHours"`
TotalCores int `json:"totalCores"`
TotalCoreHours int `json:"totalCoreHours"`
TotalAccs int `json:"totalAccs"`
TotalAccHours int `json:"totalAccHours"`
HistDuration []*HistoPoint `json:"histDuration"`
HistNumNodes []*HistoPoint `json:"histNumNodes"`
HistNumCores []*HistoPoint `json:"histNumCores"`
HistNumAccs []*HistoPoint `json:"histNumAccs"`
}
type MetricFootprints struct {
@@ -133,6 +138,12 @@ type TimeRangeOutput struct {
To time.Time `json:"to"`
}
type TimeWeights struct {
NodeHours []schema.Float `json:"nodeHours"`
AccHours []schema.Float `json:"accHours"`
CoreHours []schema.Float `json:"coreHours"`
}
type User struct {
Username string `json:"username"`
Name string `json:"name"`
@@ -182,6 +193,59 @@ func (e Aggregate) MarshalGQL(w io.Writer) {
fmt.Fprint(w, strconv.Quote(e.String()))
}
type SortByAggregate string
const (
SortByAggregateTotalwalltime SortByAggregate = "TOTALWALLTIME"
SortByAggregateTotaljobs SortByAggregate = "TOTALJOBS"
SortByAggregateTotalnodes SortByAggregate = "TOTALNODES"
SortByAggregateTotalnodehours SortByAggregate = "TOTALNODEHOURS"
SortByAggregateTotalcores SortByAggregate = "TOTALCORES"
SortByAggregateTotalcorehours SortByAggregate = "TOTALCOREHOURS"
SortByAggregateTotalaccs SortByAggregate = "TOTALACCS"
SortByAggregateTotalacchours SortByAggregate = "TOTALACCHOURS"
)
var AllSortByAggregate = []SortByAggregate{
SortByAggregateTotalwalltime,
SortByAggregateTotaljobs,
SortByAggregateTotalnodes,
SortByAggregateTotalnodehours,
SortByAggregateTotalcores,
SortByAggregateTotalcorehours,
SortByAggregateTotalaccs,
SortByAggregateTotalacchours,
}
func (e SortByAggregate) IsValid() bool {
switch e {
case SortByAggregateTotalwalltime, SortByAggregateTotaljobs, SortByAggregateTotalnodes, SortByAggregateTotalnodehours, SortByAggregateTotalcores, SortByAggregateTotalcorehours, SortByAggregateTotalaccs, SortByAggregateTotalacchours:
return true
}
return false
}
func (e SortByAggregate) String() string {
return string(e)
}
func (e *SortByAggregate) UnmarshalGQL(v interface{}) error {
str, ok := v.(string)
if !ok {
return fmt.Errorf("enums must be strings")
}
*e = SortByAggregate(str)
if !e.IsValid() {
return fmt.Errorf("%s is not a valid SortByAggregate", str)
}
return nil
}
func (e SortByAggregate) MarshalGQL(w io.Writer) {
fmt.Fprint(w, strconv.Quote(e.String()))
}
type SortDirectionEnum string
const (
@@ -222,44 +286,3 @@ func (e *SortDirectionEnum) UnmarshalGQL(v interface{}) error {
func (e SortDirectionEnum) MarshalGQL(w io.Writer) {
fmt.Fprint(w, strconv.Quote(e.String()))
}
type Weights string
const (
WeightsNodeCount Weights = "NODE_COUNT"
WeightsNodeHours Weights = "NODE_HOURS"
)
var AllWeights = []Weights{
WeightsNodeCount,
WeightsNodeHours,
}
func (e Weights) IsValid() bool {
switch e {
case WeightsNodeCount, WeightsNodeHours:
return true
}
return false
}
func (e Weights) String() string {
return string(e)
}
func (e *Weights) UnmarshalGQL(v interface{}) error {
str, ok := v.(string)
if !ok {
return fmt.Errorf("enums must be strings")
}
*e = Weights(str)
if !e.IsValid() {
return fmt.Errorf("%s is not a valid Weights", str)
}
return nil
}
func (e Weights) MarshalGQL(w io.Writer) {
fmt.Fprint(w, strconv.Quote(e.String()))
}

View File

@@ -244,34 +244,34 @@ func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, pag
}
// JobsStatistics is the resolver for the jobsStatistics field.
func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, sortBy *model.SortByAggregate, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
var err error
var stats []*model.JobsStatistics
if requireField(ctx, "totalJobs") {
if requireField(ctx, "totalJobs") || requireField(ctx, "totalWalltime") || requireField(ctx, "totalNodes") || requireField(ctx, "totalCores") ||
requireField(ctx, "totalAccs") || requireField(ctx, "totalNodeHours") || requireField(ctx, "totalCoreHours") || requireField(ctx, "totalAccHours") {
if groupBy == nil {
stats, err = r.Repo.JobsStats(ctx, filter)
} else {
stats, err = r.Repo.JobsStatsGrouped(ctx, filter, groupBy)
stats, err = r.Repo.JobsStatsGrouped(ctx, filter, page, sortBy, groupBy)
}
} else {
stats = make([]*model.JobsStatistics, 0, 1)
stats = append(stats,
&model.JobsStatistics{})
stats = append(stats, &model.JobsStatistics{})
}
if groupBy != nil {
if requireField(ctx, "shortJobs") {
stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "short")
}
if requireField(ctx, "RunningJobs") {
if requireField(ctx, "runningJobs") {
stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "running")
}
} else {
if requireField(ctx, "shortJobs") {
stats, err = r.Repo.AddJobCount(ctx, filter, stats, "short")
}
if requireField(ctx, "RunningJobs") {
if requireField(ctx, "runningJobs") {
stats, err = r.Repo.AddJobCount(ctx, filter, stats, "running")
}
}
@@ -280,7 +280,7 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF
return nil, err
}
if requireField(ctx, "histDuration") || requireField(ctx, "histNumNodes") {
if requireField(ctx, "histDuration") || requireField(ctx, "histNumNodes") || requireField(ctx, "histNumCores") || requireField(ctx, "histNumAccs") {
if groupBy == nil {
stats[0], err = r.Repo.AddHistograms(ctx, filter, stats[0])
if err != nil {
@@ -294,24 +294,6 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF
return stats, nil
}
// JobsCount is the resolver for the jobsCount field.
func (r *queryResolver) JobsCount(ctx context.Context, filter []*model.JobFilter, groupBy model.Aggregate, weight *model.Weights, limit *int) ([]*model.Count, error) {
counts, err := r.Repo.CountGroupedJobs(ctx, groupBy, filter, weight, limit)
if err != nil {
log.Warn("Error while counting grouped jobs")
return nil, err
}
res := make([]*model.Count, 0, len(counts))
for name, count := range counts {
res = append(res, &model.Count{
Name: name,
Count: count,
})
}
return res, nil
}
// RooflineHeatmap is the resolver for the rooflineHeatmap field.
func (r *queryResolver) RooflineHeatmap(ctx context.Context, filter []*model.JobFilter, rows int, cols int, minX float64, minY float64, maxX float64, maxY float64) ([][]float64, error) {
return r.rooflineHeatmap(ctx, filter, rows, cols, minX, minY, maxX, maxY)

View File

@@ -15,6 +15,7 @@ import (
"github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
// "github.com/ClusterCockpit/cc-backend/pkg/archive"
)
const MAX_JOBS_FOR_ANALYSIS = 500
@@ -106,7 +107,11 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF
avgs[i] = make([]schema.Float, 0, len(jobs))
}
nodehours := make([]schema.Float, 0, len(jobs))
timeweights := new(model.TimeWeights)
timeweights.NodeHours = make([]schema.Float, 0, len(jobs))
timeweights.AccHours = make([]schema.Float, 0, len(jobs))
timeweights.CoreHours = make([]schema.Float, 0, len(jobs))
for _, job := range jobs {
if job.MonitoringStatus == schema.MonitoringStatusDisabled || job.MonitoringStatus == schema.MonitoringStatusArchivingFailed {
continue
@@ -117,7 +122,18 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF
return nil, err
}
nodehours = append(nodehours, schema.Float(float64(job.Duration)/60.0*float64(job.NumNodes)))
// #166 collect arrays: Null values or no null values?
timeweights.NodeHours = append(timeweights.NodeHours, schema.Float(float64(job.Duration)/60.0*float64(job.NumNodes)))
if job.NumAcc > 0 {
timeweights.AccHours = append(timeweights.AccHours, schema.Float(float64(job.Duration)/60.0*float64(job.NumAcc)))
} else {
timeweights.AccHours = append(timeweights.AccHours, schema.Float(1.0))
}
if job.NumHWThreads > 0 {
timeweights.CoreHours = append(timeweights.CoreHours, schema.Float(float64(job.Duration)/60.0*float64(job.NumHWThreads))) // SQLite HWThreads == Cores; numCoresForJob(job)
} else {
timeweights.CoreHours = append(timeweights.CoreHours, schema.Float(1.0))
}
}
res := make([]*model.MetricFootprints, len(avgs))
@@ -129,11 +145,34 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF
}
return &model.Footprints{
Nodehours: nodehours,
Metrics: res,
TimeWeights: timeweights,
Metrics: res,
}, nil
}
// func numCoresForJob(job *schema.Job) (numCores int) {
// subcluster, scerr := archive.GetSubCluster(job.Cluster, job.SubCluster)
// if scerr != nil {
// return 1
// }
// totalJobCores := 0
// topology := subcluster.Topology
// for _, host := range job.Resources {
// hwthreads := host.HWThreads
// if hwthreads == nil {
// hwthreads = topology.Node
// }
// hostCores, _ := topology.GetCoresFromHWThreads(hwthreads)
// totalJobCores += len(hostCores)
// }
// return totalJobCores
// }
func requireField(ctx context.Context, name string) bool {
fields := graphql.CollectAllFields(ctx)

View File

@@ -506,7 +506,7 @@ func (ccms *CCMetricStore) LoadStats(
metrics []string,
ctx context.Context) (map[string]map[string]schema.MetricStatistics, error) {
queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode})
queries, _, err := ccms.buildQueries(job, metrics, []schema.MetricScope{schema.MetricScopeNode}) // #166 Add scope shere for analysis view accelerator normalization?
if err != nil {
log.Warn("Error while building query")
return nil, err

View File

@@ -182,7 +182,7 @@ func LoadAverages(
ctx context.Context) error {
if job.State != schema.JobStateRunning && useArchive {
return archive.LoadAveragesFromArchive(job, metrics, data)
return archive.LoadAveragesFromArchive(job, metrics, data) // #166 change also here?
}
repo, ok := metricDataRepos[job.Cluster]
@@ -190,7 +190,7 @@ func LoadAverages(
return fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", job.Cluster)
}
stats, err := repo.LoadStats(job, metrics, ctx)
stats, err := repo.LoadStats(job, metrics, ctx) // #166 how to handle stats for acc normalizazion?
if err != nil {
log.Errorf("Error while loading statistics for job %v (User %v, Project %v)", job.JobID, job.User, job.Project)
return err

View File

@@ -455,69 +455,6 @@ func (r *JobRepository) DeleteJobById(id int64) error {
return err
}
// TODO: Use node hours instead: SELECT job.user, sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN CAST(strftime('%s', 'now') AS INTEGER) - job.start_time ELSE job.duration END)) as x FROM job GROUP BY user ORDER BY x DESC;
func (r *JobRepository) CountGroupedJobs(
ctx context.Context,
aggreg model.Aggregate,
filters []*model.JobFilter,
weight *model.Weights,
limit *int) (map[string]int, error) {
start := time.Now()
if !aggreg.IsValid() {
return nil, errors.New("invalid aggregate")
}
runner := (sq.BaseRunner)(r.stmtCache)
count := "count(*) as count"
if weight != nil {
switch *weight {
case model.WeightsNodeCount:
count = "sum(job.num_nodes) as count"
case model.WeightsNodeHours:
now := time.Now().Unix()
count = fmt.Sprintf(`sum(job.num_nodes * (CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) as count`, now)
runner = r.DB
default:
log.Debugf("CountGroupedJobs() Weight %v unknown.", *weight)
}
}
q, qerr := SecurityCheck(ctx, sq.Select("job."+string(aggreg), count).From("job").GroupBy("job."+string(aggreg)).OrderBy("count DESC"))
if qerr != nil {
return nil, qerr
}
for _, f := range filters {
q = BuildWhereClause(f, q)
}
if limit != nil {
q = q.Limit(uint64(*limit))
}
counts := map[string]int{}
rows, err := q.RunWith(runner).Query()
if err != nil {
log.Error("Error while running query")
return nil, err
}
for rows.Next() {
var group string
var count int
if err := rows.Scan(&group, &count); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
counts[group] = count
}
log.Debugf("Timer CountGroupedJobs %s", time.Since(start))
return counts, nil
}
func (r *JobRepository) UpdateMonitoringStatus(job int64, monitoringStatus int32) (err error) {
stmt := sq.Update("job").
Set("monitoring_status", monitoringStatus).

View File

@@ -18,13 +18,17 @@ import (
sq "github.com/Masterminds/squirrel"
)
// SecurityCheck-less, private: Returns a list of jobs matching the provided filters. page and order are optional-
func (r *JobRepository) queryJobs(
query sq.SelectBuilder,
func (r *JobRepository) QueryJobs(
ctx context.Context,
filters []*model.JobFilter,
page *model.PageRequest,
order *model.OrderByInput) ([]*schema.Job, error) {
query, qerr := SecurityCheck(ctx, sq.Select(jobColumns...).From("job"))
if qerr != nil {
return nil, qerr
}
if order != nil {
field := toSnakeCase(order.Field)
@@ -67,34 +71,15 @@ func (r *JobRepository) queryJobs(
return jobs, nil
}
// testFunction for queryJobs
func (r *JobRepository) testQueryJobs(
filters []*model.JobFilter,
page *model.PageRequest,
order *model.OrderByInput) ([]*schema.Job, error) {
return r.queryJobs(sq.Select(jobColumns...).From("job"), filters, page, order)
}
// Public function with added securityCheck, calls private queryJobs function above
func (r *JobRepository) QueryJobs(
func (r *JobRepository) CountJobs(
ctx context.Context,
filters []*model.JobFilter,
page *model.PageRequest,
order *model.OrderByInput) ([]*schema.Job, error) {
query, qerr := SecurityCheck(ctx, sq.Select(jobColumns...).From("job"))
if qerr != nil {
return nil, qerr
}
return r.queryJobs(query, filters, page, order)
}
// SecurityCheck-less, private: Returns the number of jobs matching the filters
func (r *JobRepository) countJobs(query sq.SelectBuilder,
filters []*model.JobFilter) (int, error) {
query, qerr := SecurityCheck(ctx, sq.Select("count(*)").From("job"))
if qerr != nil {
return 0, qerr
}
for _, f := range filters {
query = BuildWhereClause(f, query)
}
@@ -107,27 +92,6 @@ func (r *JobRepository) countJobs(query sq.SelectBuilder,
return count, nil
}
// testFunction for countJobs
func (r *JobRepository) testCountJobs(
filters []*model.JobFilter) (int, error) {
return r.countJobs(sq.Select("count(*)").From("job"), filters)
}
// Public function with added securityCheck, calls private countJobs function above
func (r *JobRepository) CountJobs(
ctx context.Context,
filters []*model.JobFilter) (int, error) {
query, qerr := SecurityCheck(ctx, sq.Select("count(*)").From("job"))
if qerr != nil {
return 0, qerr
}
return r.countJobs(query, filters)
}
func SecurityCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilder, error) {
user := GetUserFromContext(ctx)
if user == nil {

View File

@@ -5,10 +5,12 @@
package repository
import (
"context"
"testing"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/log"
"github.com/ClusterCockpit/cc-backend/pkg/schema"
_ "github.com/mattn/go-sqlite3"
)
@@ -94,7 +96,7 @@ func BenchmarkDB_CountJobs(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := db.testCountJobs([]*model.JobFilter{filter})
_, err := db.CountJobs(getContext(b), []*model.JobFilter{filter})
noErr(b, err)
}
})
@@ -118,20 +120,37 @@ func BenchmarkDB_QueryJobs(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := db.testQueryJobs([]*model.JobFilter{filter}, page, order)
_, err := db.QueryJobs(getContext(b), []*model.JobFilter{filter}, page, order)
noErr(b, err)
}
})
})
}
func getContext(tb testing.TB) context.Context {
tb.Helper()
var roles []string
roles = append(roles, schema.GetRoleString(schema.RoleAdmin))
projects := make([]string, 0)
user := &schema.User{
Username: "demo",
Name: "The man",
Roles: roles,
Projects: projects,
AuthSource: schema.AuthViaLDAP,
}
ctx := context.Background()
return context.WithValue(ctx, ContextUserKey, user)
}
func setup(tb testing.TB) *JobRepository {
tb.Helper()
log.Init("warn", true)
dbfile := "testdata/job.db"
err := MigrateDB("sqlite3", dbfile)
noErr(tb, err)
Connect("sqlite3", dbfile)
return GetJobRepository()
}

View File

@@ -23,6 +23,17 @@ var groupBy2column = map[model.Aggregate]string{
model.AggregateCluster: "job.cluster",
}
var sortBy2column = map[model.SortByAggregate]string{
model.SortByAggregateTotaljobs: "totalJobs",
model.SortByAggregateTotalwalltime: "totalWalltime",
model.SortByAggregateTotalnodes: "totalNodes",
model.SortByAggregateTotalnodehours: "totalNodeHours",
model.SortByAggregateTotalcores: "totalCores",
model.SortByAggregateTotalcorehours: "totalCoreHours",
model.SortByAggregateTotalaccs: "totalAccs",
model.SortByAggregateTotalacchours: "totalAccHours",
}
func (r *JobRepository) buildCountQuery(
filter []*model.JobFilter,
kind string,
@@ -60,19 +71,26 @@ func (r *JobRepository) buildStatsQuery(
castType := r.getCastType()
if col != "" {
// Scan columns: id, totalJobs, totalWalltime, totalNodeHours, totalCoreHours, totalAccHours
query = sq.Select(col, "COUNT(job.id)",
fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_acc) / 3600) as %s)", castType),
// Scan columns: id, totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
query = sq.Select(col, "COUNT(job.id) as totalJobs",
fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s) as totalWalltime", castType),
fmt.Sprintf("CAST(SUM(job.num_nodes) as %s) as totalNodes", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s) as totalNodeHours", castType),
fmt.Sprintf("CAST(SUM(job.num_hwthreads) as %s) as totalCores", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s) as totalCoreHours", castType),
fmt.Sprintf("CAST(SUM(job.num_acc) as %s) as totalAccs", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_acc) / 3600) as %s) as totalAccHours", castType),
).From("job").GroupBy(col)
} else {
// Scan columns: totalJobs, totalWalltime, totalNodeHours, totalCoreHours, totalAccHours
// Scan columns: totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours
query = sq.Select("COUNT(job.id)",
fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType),
fmt.Sprintf("CAST(SUM(job.num_nodes) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s)", castType),
fmt.Sprintf("CAST(SUM(job.num_hwthreads) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s)", castType),
fmt.Sprintf("CAST(SUM(job.num_acc) as %s)", castType),
fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_acc) / 3600) as %s)", castType),
).From("job")
}
@@ -112,16 +130,28 @@ func (r *JobRepository) getCastType() string {
func (r *JobRepository) JobsStatsGrouped(
ctx context.Context,
filter []*model.JobFilter,
page *model.PageRequest,
sortBy *model.SortByAggregate,
groupBy *model.Aggregate) ([]*model.JobsStatistics, error) {
start := time.Now()
col := groupBy2column[*groupBy]
query := r.buildStatsQuery(filter, col)
query, err := SecurityCheck(ctx, query)
if err != nil {
return nil, err
}
if sortBy != nil {
sortBy := sortBy2column[*sortBy]
query = query.OrderBy(fmt.Sprintf("%s DESC", sortBy))
}
if page != nil && page.ItemsPerPage != -1 {
limit := uint64(page.ItemsPerPage)
query = query.Offset((uint64(page.Page) - 1) * limit).Limit(limit)
}
rows, err := query.RunWith(r.DB).Query()
if err != nil {
log.Warn("Error while querying DB for job statistics")
@@ -132,15 +162,36 @@ func (r *JobRepository) JobsStatsGrouped(
for rows.Next() {
var id sql.NullString
var jobs, walltime, nodeHours, coreHours, accHours sql.NullInt64
if err := rows.Scan(&id, &jobs, &walltime, &nodeHours, &coreHours, &accHours); err != nil {
var jobs, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64
if err := rows.Scan(&id, &jobs, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if id.Valid {
var totalCoreHours, totalAccHours int
var totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours int
if jobs.Valid {
totalJobs = int(jobs.Int64)
}
if walltime.Valid {
totalWalltime = int(walltime.Int64)
}
if nodes.Valid {
totalNodes = int(nodes.Int64)
}
if cores.Valid {
totalCores = int(cores.Int64)
}
if accs.Valid {
totalAccs = int(accs.Int64)
}
if nodeHours.Valid {
totalNodeHours = int(nodeHours.Int64)
}
if coreHours.Valid {
totalCoreHours = int(coreHours.Int64)
}
@@ -154,9 +205,13 @@ func (r *JobRepository) JobsStatsGrouped(
&model.JobsStatistics{
ID: id.String,
Name: name,
TotalJobs: int(jobs.Int64),
TotalWalltime: int(walltime.Int64),
TotalJobs: totalJobs,
TotalWalltime: totalWalltime,
TotalNodes: totalNodes,
TotalNodeHours: totalNodeHours,
TotalCores: totalCores,
TotalCoreHours: totalCoreHours,
TotalAccs: totalAccs,
TotalAccHours: totalAccHours})
} else {
stats = append(stats,
@@ -164,7 +219,11 @@ func (r *JobRepository) JobsStatsGrouped(
ID: id.String,
TotalJobs: int(jobs.Int64),
TotalWalltime: int(walltime.Int64),
TotalNodes: totalNodes,
TotalNodeHours: totalNodeHours,
TotalCores: totalCores,
TotalCoreHours: totalCoreHours,
TotalAccs: totalAccs,
TotalAccHours: totalAccHours})
}
}
@@ -188,15 +247,18 @@ func (r *JobRepository) JobsStats(
row := query.RunWith(r.DB).QueryRow()
stats := make([]*model.JobsStatistics, 0, 1)
var jobs, walltime, nodeHours, coreHours, accHours sql.NullInt64
if err := row.Scan(&jobs, &walltime, &nodeHours, &coreHours, &accHours); err != nil {
var jobs, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64
if err := row.Scan(&jobs, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil {
log.Warn("Error while scanning rows")
return nil, err
}
if jobs.Valid {
var totalCoreHours, totalAccHours int
var totalNodeHours, totalCoreHours, totalAccHours int
if nodeHours.Valid {
totalNodeHours = int(nodeHours.Int64)
}
if coreHours.Valid {
totalCoreHours = int(coreHours.Int64)
}
@@ -207,6 +269,7 @@ func (r *JobRepository) JobsStats(
&model.JobsStatistics{
TotalJobs: int(jobs.Int64),
TotalWalltime: int(walltime.Int64),
TotalNodeHours: totalNodeHours,
TotalCoreHours: totalCoreHours,
TotalAccHours: totalAccHours})
}
@@ -321,7 +384,7 @@ func (r *JobRepository) AddJobCount(
return nil, err
}
counts := make(map[string]int)
var count int
for rows.Next() {
var cnt sql.NullInt64
@@ -329,20 +392,22 @@ func (r *JobRepository) AddJobCount(
log.Warn("Error while scanning rows")
return nil, err
}
count = int(cnt.Int64)
}
switch kind {
case "running":
for _, s := range stats {
s.RunningJobs = counts[s.ID]
s.RunningJobs = count
}
case "short":
for _, s := range stats {
s.ShortJobs = counts[s.ID]
s.ShortJobs = count
}
}
log.Debugf("Timer JobJobCount %s", time.Since(start))
log.Debugf("Timer AddJobCount %s", time.Since(start))
return stats, nil
}
@@ -367,6 +432,18 @@ func (r *JobRepository) AddHistograms(
return nil, err
}
stat.HistNumCores, err = r.jobsStatisticsHistogram(ctx, "job.num_hwthreads as value", filter)
if err != nil {
log.Warn("Error while loading job statistics histogram: num hwthreads")
return nil, err
}
stat.HistNumAccs, err = r.jobsStatisticsHistogram(ctx, "job.num_acc as value", filter)
if err != nil {
log.Warn("Error while loading job statistics histogram: num acc")
return nil, err
}
log.Debugf("Timer AddHistograms %s", time.Since(start))
return stat, nil
}

View File

@@ -7,6 +7,8 @@ package repository
import (
"fmt"
"testing"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
)
func TestBuildJobStatsQuery(t *testing.T) {
@@ -19,3 +21,15 @@ func TestBuildJobStatsQuery(t *testing.T) {
fmt.Printf("SQL: %s\n", sql)
}
func TestJobStats(t *testing.T) {
r := setup(t)
filter := &model.JobFilter{}
stats, err := r.JobsStats(getContext(t), []*model.JobFilter{filter})
noErr(t, err)
if stats[0].TotalJobs != 6 {
t.Fatalf("Want 98, Got %d", stats[0].TotalJobs)
}
}