From 695e8e270a5b4f5ded99d672d54196a84aacdbd3 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 6 Jun 2023 10:27:55 +0200 Subject: [PATCH 01/12] Split job Repository in multiple files --- internal/repository/job.go | 332 ----------------------------- internal/repository/stats.go | 256 ++++++++++++++++++++++ internal/repository/transaction.go | 104 +++++++++ 3 files changed, 360 insertions(+), 332 deletions(-) create mode 100644 internal/repository/stats.go create mode 100644 internal/repository/transaction.go diff --git a/internal/repository/job.go b/internal/repository/job.go index 1e88c43..1756740 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -14,12 +14,9 @@ import ( "sync" "time" - "github.com/99designs/gqlgen/graphql" "github.com/ClusterCockpit/cc-backend/internal/auth" - "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/metricdata" - "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/lrucache" "github.com/ClusterCockpit/cc-backend/pkg/schema" @@ -765,242 +762,6 @@ func (r *JobRepository) FindJobsBefore(startTime int64) ([]*schema.Job, error) { return jobs, nil } -// GraphQL validation should make sure that no unkown values can be specified. -var groupBy2column = map[model.Aggregate]string{ - model.AggregateUser: "job.user", - model.AggregateProject: "job.project", - model.AggregateCluster: "job.cluster", -} - -// Helper function for the jobsStatistics GraphQL query placed here so that schema.resolvers.go is not too full. -func (r *JobRepository) JobsStatistics(ctx context.Context, - filter []*model.JobFilter, - groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { - - start := time.Now() - // In case `groupBy` is nil (not used), the model.JobsStatistics used is at the key '' (empty string) - stats := map[string]*model.JobsStatistics{} - var castType string - - switch r.driver { - case "sqlite3": - castType = "int" - case "mysql": - castType = "unsigned" - } - - // `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those. - for _, cluster := range archive.Clusters { - for _, subcluster := range cluster.SubClusters { - corehoursCol := fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as %s)", subcluster.SocketsPerNode, subcluster.CoresPerSocket, castType) - var rawQuery sq.SelectBuilder - if groupBy == nil { - rawQuery = sq.Select( - "''", - "COUNT(job.id)", - fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType), - corehoursCol, - ).From("job") - } else { - col := groupBy2column[*groupBy] - rawQuery = sq.Select( - col, - "COUNT(job.id)", - fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType), - corehoursCol, - ).From("job").GroupBy(col) - } - - rawQuery = rawQuery. - Where("job.cluster = ?", cluster.Name). - Where("job.subcluster = ?", subcluster.Name) - - query, qerr := SecurityCheck(ctx, rawQuery) - - if qerr != nil { - return nil, qerr - } - - for _, f := range filter { - query = BuildWhereClause(f, query) - } - - rows, err := query.RunWith(r.DB).Query() - if err != nil { - log.Warn("Error while querying DB for job statistics") - return nil, err - } - - for rows.Next() { - var id sql.NullString - var jobs, walltime, corehours sql.NullInt64 - if err := rows.Scan(&id, &jobs, &walltime, &corehours); err != nil { - log.Warn("Error while scanning rows") - return nil, err - } - - if id.Valid { - if s, ok := stats[id.String]; ok { - s.TotalJobs += int(jobs.Int64) - s.TotalWalltime += int(walltime.Int64) - s.TotalCoreHours += int(corehours.Int64) - } else { - stats[id.String] = &model.JobsStatistics{ - ID: id.String, - TotalJobs: int(jobs.Int64), - TotalWalltime: int(walltime.Int64), - TotalCoreHours: int(corehours.Int64), - } - } - } - } - - } - } - - if groupBy == nil { - - query := sq.Select("COUNT(job.id)").From("job").Where("job.duration < ?", config.Keys.ShortRunningJobsDuration) - query, qerr := SecurityCheck(ctx, query) - - if qerr != nil { - return nil, qerr - } - - for _, f := range filter { - query = BuildWhereClause(f, query) - } - if err := query.RunWith(r.DB).QueryRow().Scan(&(stats[""].ShortJobs)); err != nil { - log.Warn("Error while scanning rows for short job stats") - return nil, err - } - } else { - col := groupBy2column[*groupBy] - - query := sq.Select(col, "COUNT(job.id)").From("job").Where("job.duration < ?", config.Keys.ShortRunningJobsDuration) - - query, qerr := SecurityCheck(ctx, query) - - if qerr != nil { - return nil, qerr - } - - for _, f := range filter { - query = BuildWhereClause(f, query) - } - rows, err := query.RunWith(r.DB).Query() - if err != nil { - log.Warn("Error while querying jobs for short jobs") - return nil, err - } - - for rows.Next() { - var id sql.NullString - var shortJobs sql.NullInt64 - if err := rows.Scan(&id, &shortJobs); err != nil { - log.Warn("Error while scanning rows for short jobs") - return nil, err - } - - if id.Valid { - stats[id.String].ShortJobs = int(shortJobs.Int64) - } - } - - if col == "job.user" { - for id := range stats { - emptyDash := "-" - user := auth.GetUser(ctx) - name, _ := r.FindColumnValue(user, id, "user", "name", "username", false) - if name != "" { - stats[id].Name = &name - } else { - stats[id].Name = &emptyDash - } - } - } - } - - // Calculating the histogram data is expensive, so only do it if needed. - // An explicit resolver can not be used because we need to know the filters. - histogramsNeeded := false - fields := graphql.CollectFieldsCtx(ctx, nil) - for _, col := range fields { - if col.Name == "histDuration" || col.Name == "histNumNodes" { - histogramsNeeded = true - } - } - - res := make([]*model.JobsStatistics, 0, len(stats)) - for _, stat := range stats { - res = append(res, stat) - id, col := "", "" - if groupBy != nil { - id = stat.ID - col = groupBy2column[*groupBy] - } - - if histogramsNeeded { - var err error - value := fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType) - stat.HistDuration, err = r.jobsStatisticsHistogram(ctx, value, filter, id, col) - if err != nil { - log.Warn("Error while loading job statistics histogram: running jobs") - return nil, err - } - - stat.HistNumNodes, err = r.jobsStatisticsHistogram(ctx, "job.num_nodes as value", filter, id, col) - if err != nil { - log.Warn("Error while loading job statistics histogram: num nodes") - return nil, err - } - } - } - - log.Infof("Timer JobStatistics %s", time.Since(start)) - return res, nil -} - -// `value` must be the column grouped by, but renamed to "value". `id` and `col` can optionally be used -// to add a condition to the query of the kind " = ". -func (r *JobRepository) jobsStatisticsHistogram(ctx context.Context, - value string, filters []*model.JobFilter, id, col string) ([]*model.HistoPoint, error) { - - start := time.Now() - query, qerr := SecurityCheck(ctx, sq.Select(value, "COUNT(job.id) AS count").From("job")) - - if qerr != nil { - return nil, qerr - } - - for _, f := range filters { - query = BuildWhereClause(f, query) - } - - if len(id) != 0 && len(col) != 0 { - query = query.Where(col+" = ?", id) - } - - rows, err := query.GroupBy("value").RunWith(r.DB).Query() - if err != nil { - log.Error("Error while running query") - return nil, err - } - - points := make([]*model.HistoPoint, 0) - for rows.Next() { - point := model.HistoPoint{} - if err := rows.Scan(&point.Value, &point.Count); err != nil { - log.Warn("Error while scanning rows") - return nil, err - } - - points = append(points, &point) - } - log.Infof("Timer jobsStatisticsHistogram %s", time.Since(start)) - return points, nil -} - const NamedJobInsert string = `INSERT INTO job ( job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc, exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data, @@ -1025,96 +786,3 @@ func (r *JobRepository) InsertJob(job *schema.Job) (int64, error) { return id, nil } - -type Transaction struct { - tx *sqlx.Tx - stmt *sqlx.NamedStmt -} - -func (r *JobRepository) TransactionInit() (*Transaction, error) { - var err error - t := new(Transaction) - // Inserts are bundled into transactions because in sqlite, - // that speeds up inserts A LOT. - t.tx, err = r.DB.Beginx() - if err != nil { - log.Warn("Error while bundling transactions") - return nil, err - } - - t.stmt, err = t.tx.PrepareNamed(NamedJobInsert) - if err != nil { - log.Warn("Error while preparing namedJobInsert") - return nil, err - } - - return t, nil -} - -func (r *JobRepository) TransactionCommit(t *Transaction) error { - var err error - if t.tx != nil { - if err = t.tx.Commit(); err != nil { - log.Warn("Error while committing transactions") - return err - } - } - - t.tx, err = r.DB.Beginx() - if err != nil { - log.Warn("Error while bundling transactions") - return err - } - - t.stmt = t.tx.NamedStmt(t.stmt) - return nil -} - -func (r *JobRepository) TransactionEnd(t *Transaction) error { - if err := t.tx.Commit(); err != nil { - log.Warn("Error while committing SQL transactions") - return err - } - - return nil -} - -func (r *JobRepository) TransactionAdd(t *Transaction, job schema.Job) (int64, error) { - res, err := t.stmt.Exec(job) - if err != nil { - log.Errorf("repository initDB(): %v", err) - return 0, err - } - - id, err := res.LastInsertId() - if err != nil { - log.Errorf("repository initDB(): %v", err) - return 0, err - } - - return id, nil -} - -func (r *JobRepository) TransactionAddTag(t *Transaction, tag *schema.Tag) (int64, error) { - res, err := t.tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type) - if err != nil { - log.Errorf("Error while inserting tag into tag table: %v (Type %v)", tag.Name, tag.Type) - return 0, err - } - tagId, err := res.LastInsertId() - if err != nil { - log.Warn("Error while getting last insert ID") - return 0, err - } - - return tagId, nil -} - -func (r *JobRepository) TransactionSetTag(t *Transaction, jobId int64, tagId int64) error { - if _, err := t.tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, jobId, tagId); err != nil { - log.Errorf("Error while inserting jobtag into jobtag table: %v (TagID %v)", jobId, tagId) - return err - } - - return nil -} diff --git a/internal/repository/stats.go b/internal/repository/stats.go new file mode 100644 index 0000000..5ea589f --- /dev/null +++ b/internal/repository/stats.go @@ -0,0 +1,256 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package repository + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/99designs/gqlgen/graphql" + "github.com/ClusterCockpit/cc-backend/internal/auth" + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/graph/model" + "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-backend/pkg/log" + sq "github.com/Masterminds/squirrel" +) + +// GraphQL validation should make sure that no unkown values can be specified. +var groupBy2column = map[model.Aggregate]string{ + model.AggregateUser: "job.user", + model.AggregateProject: "job.project", + model.AggregateCluster: "job.cluster", +} + +// Helper function for the jobsStatistics GraphQL query placed here so that schema.resolvers.go is not too full. +func (r *JobRepository) JobsStatistics(ctx context.Context, + filter []*model.JobFilter, + groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { + + start := time.Now() + // In case `groupBy` is nil (not used), the model.JobsStatistics used is at the key '' (empty string) + stats := map[string]*model.JobsStatistics{} + var castType string + + switch r.driver { + case "sqlite3": + castType = "int" + case "mysql": + castType = "unsigned" + } + + // `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those. + for _, cluster := range archive.Clusters { + for _, subcluster := range cluster.SubClusters { + corehoursCol := fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as %s)", subcluster.SocketsPerNode, subcluster.CoresPerSocket, castType) + var rawQuery sq.SelectBuilder + if groupBy == nil { + rawQuery = sq.Select( + "''", + "COUNT(job.id)", + fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType), + corehoursCol, + ).From("job") + } else { + col := groupBy2column[*groupBy] + rawQuery = sq.Select( + col, + "COUNT(job.id)", + fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType), + corehoursCol, + ).From("job").GroupBy(col) + } + + rawQuery = rawQuery. + Where("job.cluster = ?", cluster.Name). + Where("job.subcluster = ?", subcluster.Name) + + query, qerr := SecurityCheck(ctx, rawQuery) + + if qerr != nil { + return nil, qerr + } + + for _, f := range filter { + query = BuildWhereClause(f, query) + } + + rows, err := query.RunWith(r.DB).Query() + if err != nil { + log.Warn("Error while querying DB for job statistics") + return nil, err + } + + for rows.Next() { + var id sql.NullString + var jobs, walltime, corehours sql.NullInt64 + if err := rows.Scan(&id, &jobs, &walltime, &corehours); err != nil { + log.Warn("Error while scanning rows") + return nil, err + } + + if id.Valid { + if s, ok := stats[id.String]; ok { + s.TotalJobs += int(jobs.Int64) + s.TotalWalltime += int(walltime.Int64) + s.TotalCoreHours += int(corehours.Int64) + } else { + stats[id.String] = &model.JobsStatistics{ + ID: id.String, + TotalJobs: int(jobs.Int64), + TotalWalltime: int(walltime.Int64), + TotalCoreHours: int(corehours.Int64), + } + } + } + } + + } + } + + if groupBy == nil { + + query := sq.Select("COUNT(job.id)").From("job").Where("job.duration < ?", config.Keys.ShortRunningJobsDuration) + query, qerr := SecurityCheck(ctx, query) + + if qerr != nil { + return nil, qerr + } + + for _, f := range filter { + query = BuildWhereClause(f, query) + } + if err := query.RunWith(r.DB).QueryRow().Scan(&(stats[""].ShortJobs)); err != nil { + log.Warn("Error while scanning rows for short job stats") + return nil, err + } + } else { + col := groupBy2column[*groupBy] + + query := sq.Select(col, "COUNT(job.id)").From("job").Where("job.duration < ?", config.Keys.ShortRunningJobsDuration) + + query, qerr := SecurityCheck(ctx, query) + + if qerr != nil { + return nil, qerr + } + + for _, f := range filter { + query = BuildWhereClause(f, query) + } + rows, err := query.RunWith(r.DB).Query() + if err != nil { + log.Warn("Error while querying jobs for short jobs") + return nil, err + } + + for rows.Next() { + var id sql.NullString + var shortJobs sql.NullInt64 + if err := rows.Scan(&id, &shortJobs); err != nil { + log.Warn("Error while scanning rows for short jobs") + return nil, err + } + + if id.Valid { + stats[id.String].ShortJobs = int(shortJobs.Int64) + } + } + + if col == "job.user" { + for id := range stats { + emptyDash := "-" + user := auth.GetUser(ctx) + name, _ := r.FindColumnValue(user, id, "user", "name", "username", false) + if name != "" { + stats[id].Name = &name + } else { + stats[id].Name = &emptyDash + } + } + } + } + + // Calculating the histogram data is expensive, so only do it if needed. + // An explicit resolver can not be used because we need to know the filters. + histogramsNeeded := false + fields := graphql.CollectFieldsCtx(ctx, nil) + for _, col := range fields { + if col.Name == "histDuration" || col.Name == "histNumNodes" { + histogramsNeeded = true + } + } + + res := make([]*model.JobsStatistics, 0, len(stats)) + for _, stat := range stats { + res = append(res, stat) + id, col := "", "" + if groupBy != nil { + id = stat.ID + col = groupBy2column[*groupBy] + } + + if histogramsNeeded { + var err error + value := fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType) + stat.HistDuration, err = r.jobsStatisticsHistogram(ctx, value, filter, id, col) + if err != nil { + log.Warn("Error while loading job statistics histogram: running jobs") + return nil, err + } + + stat.HistNumNodes, err = r.jobsStatisticsHistogram(ctx, "job.num_nodes as value", filter, id, col) + if err != nil { + log.Warn("Error while loading job statistics histogram: num nodes") + return nil, err + } + } + } + + log.Infof("Timer JobStatistics %s", time.Since(start)) + return res, nil +} + +// `value` must be the column grouped by, but renamed to "value". `id` and `col` can optionally be used +// to add a condition to the query of the kind " = ". +func (r *JobRepository) jobsStatisticsHistogram(ctx context.Context, + value string, filters []*model.JobFilter, id, col string) ([]*model.HistoPoint, error) { + + start := time.Now() + query, qerr := SecurityCheck(ctx, sq.Select(value, "COUNT(job.id) AS count").From("job")) + + if qerr != nil { + return nil, qerr + } + + for _, f := range filters { + query = BuildWhereClause(f, query) + } + + if len(id) != 0 && len(col) != 0 { + query = query.Where(col+" = ?", id) + } + + rows, err := query.GroupBy("value").RunWith(r.DB).Query() + if err != nil { + log.Error("Error while running query") + return nil, err + } + + points := make([]*model.HistoPoint, 0) + for rows.Next() { + point := model.HistoPoint{} + if err := rows.Scan(&point.Value, &point.Count); err != nil { + log.Warn("Error while scanning rows") + return nil, err + } + + points = append(points, &point) + } + log.Infof("Timer jobsStatisticsHistogram %s", time.Since(start)) + return points, nil +} diff --git a/internal/repository/transaction.go b/internal/repository/transaction.go new file mode 100644 index 0000000..4d003d7 --- /dev/null +++ b/internal/repository/transaction.go @@ -0,0 +1,104 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package repository + +import ( + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" + "github.com/jmoiron/sqlx" +) + +type Transaction struct { + tx *sqlx.Tx + stmt *sqlx.NamedStmt +} + +func (r *JobRepository) TransactionInit() (*Transaction, error) { + var err error + t := new(Transaction) + // Inserts are bundled into transactions because in sqlite, + // that speeds up inserts A LOT. + t.tx, err = r.DB.Beginx() + if err != nil { + log.Warn("Error while bundling transactions") + return nil, err + } + + t.stmt, err = t.tx.PrepareNamed(NamedJobInsert) + if err != nil { + log.Warn("Error while preparing namedJobInsert") + return nil, err + } + + return t, nil +} + +func (r *JobRepository) TransactionCommit(t *Transaction) error { + var err error + if t.tx != nil { + if err = t.tx.Commit(); err != nil { + log.Warn("Error while committing transactions") + return err + } + } + + t.tx, err = r.DB.Beginx() + if err != nil { + log.Warn("Error while bundling transactions") + return err + } + + t.stmt = t.tx.NamedStmt(t.stmt) + return nil +} + +func (r *JobRepository) TransactionEnd(t *Transaction) error { + if err := t.tx.Commit(); err != nil { + log.Warn("Error while committing SQL transactions") + return err + } + + return nil +} + +func (r *JobRepository) TransactionAdd(t *Transaction, job schema.Job) (int64, error) { + res, err := t.stmt.Exec(job) + if err != nil { + log.Errorf("repository initDB(): %v", err) + return 0, err + } + + id, err := res.LastInsertId() + if err != nil { + log.Errorf("repository initDB(): %v", err) + return 0, err + } + + return id, nil +} + +func (r *JobRepository) TransactionAddTag(t *Transaction, tag *schema.Tag) (int64, error) { + res, err := t.tx.Exec(`INSERT INTO tag (tag_name, tag_type) VALUES (?, ?)`, tag.Name, tag.Type) + if err != nil { + log.Errorf("Error while inserting tag into tag table: %v (Type %v)", tag.Name, tag.Type) + return 0, err + } + tagId, err := res.LastInsertId() + if err != nil { + log.Warn("Error while getting last insert ID") + return 0, err + } + + return tagId, nil +} + +func (r *JobRepository) TransactionSetTag(t *Transaction, jobId int64, tagId int64) error { + if _, err := t.tx.Exec(`INSERT INTO jobtag (job_id, tag_id) VALUES (?, ?)`, jobId, tagId); err != nil { + log.Errorf("Error while inserting jobtag into jobtag table: %v (TagID %v)", jobId, tagId) + return err + } + + return nil +} From 784ae125be014a58273c8049c5af656225d6a9b5 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 6 Jun 2023 13:13:08 +0200 Subject: [PATCH 02/12] Reformatting --- internal/graph/schema.resolvers.go | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index a24fac0..c558d54 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -239,12 +239,20 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str } // JobsFootprints is the resolver for the jobsFootprints field. -func (r *queryResolver) JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error) { +func (r *queryResolver) JobsFootprints( + ctx context.Context, + filter []*model.JobFilter, + metrics []string) (*model.Footprints, error) { + return r.jobsFootprints(ctx, filter, metrics) } // Jobs is the resolver for the jobs field. -func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) (*model.JobResultList, error) { +func (r *queryResolver) Jobs( + ctx context.Context, + filter []*model.JobFilter, + page *model.PageRequest, + order *model.OrderByInput) (*model.JobResultList, error) { if page == nil { page = &model.PageRequest{ ItemsPerPage: 50, @@ -268,12 +276,22 @@ func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, pag } // JobsStatistics is the resolver for the jobsStatistics field. -func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { +func (r *queryResolver) JobsStatistics( + ctx context.Context, + filter []*model.JobFilter, + groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { + return r.Repo.JobsStatistics(ctx, filter, groupBy) } // JobsCount is the resolver for the jobsCount field. -func (r *queryResolver) JobsCount(ctx context.Context, filter []*model.JobFilter, groupBy model.Aggregate, weight *model.Weights, limit *int) ([]*model.Count, error) { +func (r *queryResolver) JobsCount( + ctx context.Context, + filter []*model.JobFilter, + groupBy model.Aggregate, + weight *model.Weights, + limit *int) ([]*model.Count, error) { + counts, err := r.Repo.CountGroupedJobs(ctx, groupBy, filter, weight, limit) if err != nil { log.Warn("Error while counting grouped jobs") From becb2bfa3a5967daad12388e2bc63a287ccbbc92 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 7 Jun 2023 11:58:58 +0200 Subject: [PATCH 03/12] Refactor Jobs stats resolver --- internal/graph/schema.resolvers.go | 58 ++++- internal/repository/stats.go | 382 ++++++++++++++++++----------- internal/repository/stats_test.go | 24 ++ 3 files changed, 317 insertions(+), 147 deletions(-) create mode 100644 internal/repository/stats_test.go diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index a24fac0..c2a2ed7 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -11,6 +11,7 @@ import ( "strconv" "time" + "github.com/99designs/gqlgen/graphql" "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/graph/generated" "github.com/ClusterCockpit/cc-backend/internal/graph/model" @@ -38,7 +39,6 @@ func (r *jobResolver) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, // ConcurrentJobs is the resolver for the concurrentJobs field. func (r *jobResolver) ConcurrentJobs(ctx context.Context, obj *schema.Job) (*model.JobLinkResultList, error) { - exc := int(obj.Exclusive) if exc != 1 { filter := []*model.JobFilter{} @@ -269,7 +269,43 @@ func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, pag // JobsStatistics is the resolver for the jobsStatistics field. func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { - return r.Repo.JobsStatistics(ctx, filter, groupBy) + var err error + var stats []*model.JobsStatistics + + if requireField(ctx, "TotalJobs") { + if requireField(ctx, "TotalCoreHours") { + if groupBy == nil { + stats, err = r.Repo.JobsStatsPlain(ctx, filter) + } else { + stats, err = r.Repo.JobsStats(ctx, filter, groupBy) + } + } else { + if groupBy == nil { + stats, err = r.Repo.JobsStatsPlainNoCoreH(ctx, filter) + } else { + stats, err = r.Repo.JobsStatsNoCoreH(ctx, filter, groupBy) + } + } + } else { + stats = make([]*model.JobsStatistics, 0, 1) + } + + if err != nil { + return nil, err + } + + if requireField(ctx, "histDuration") || requireField(ctx, "histNumNodes") { + if groupBy == nil { + stats[0], err = r.Repo.AddHistograms(ctx, filter, stats[0]) + if err != nil { + return nil, err + } + } else { + return nil, errors.New("histograms only implemented without groupBy argument") + } + } + + return stats, nil } // JobsCount is the resolver for the jobsCount field. @@ -367,3 +403,21 @@ type jobResolver struct{ *Resolver } type mutationResolver struct{ *Resolver } type queryResolver struct{ *Resolver } type subClusterResolver struct{ *Resolver } + +// !!! WARNING !!! +// The code below was going to be deleted when updating resolvers. It has been copied here so you have +// one last chance to move it out of harms way if you want. There are two reasons this happens: +// - When renaming or deleting a resolver the old code will be put in here. You can safely delete +// it when you're done. +// - You have helper methods in this file. Move them out to keep these resolver files clean. +func requireField(ctx context.Context, name string) bool { + fields := graphql.CollectAllFields(ctx) + + for _, f := range fields { + if f == name { + return true + } + } + + return false +} diff --git a/internal/repository/stats.go b/internal/repository/stats.go index 5ea589f..9ec646f 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -10,9 +10,7 @@ import ( "fmt" "time" - "github.com/99designs/gqlgen/graphql" "github.com/ClusterCockpit/cc-backend/internal/auth" - "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" @@ -26,14 +24,43 @@ var groupBy2column = map[model.Aggregate]string{ model.AggregateCluster: "job.cluster", } -// Helper function for the jobsStatistics GraphQL query placed here so that schema.resolvers.go is not too full. -func (r *JobRepository) JobsStatistics(ctx context.Context, +func (r *JobRepository) buildJobsStatsQuery( filter []*model.JobFilter, - groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { + col string) sq.SelectBuilder { - start := time.Now() - // In case `groupBy` is nil (not used), the model.JobsStatistics used is at the key '' (empty string) - stats := map[string]*model.JobsStatistics{} + var query sq.SelectBuilder + castType := r.getCastType() + + if col != "" { + // Scan columns: id, totalJobs, totalWalltime + query = sq.Select(col, "COUNT(job.id)", + fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType), + ).From("job").GroupBy(col) + } else { + // Scan columns: totalJobs, totalWalltime + query = sq.Select("COUNT(job.id)", + fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType), + ).From("job") + } + + for _, f := range filter { + query = BuildWhereClause(f, query) + } + + return query +} + +func (r *JobRepository) getUserName(ctx context.Context, id string) string { + user := auth.GetUser(ctx) + name, _ := r.FindColumnValue(user, id, "user", "name", "username", false) + if name != "" { + return name + } else { + return "-" + } +} + +func (r *JobRepository) getCastType() string { var castType string switch r.driver { @@ -41,45 +68,175 @@ func (r *JobRepository) JobsStatistics(ctx context.Context, castType = "int" case "mysql": castType = "unsigned" + default: + castType = "" } - // `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those. + return castType +} + +// with groupBy and without coreHours +func (r *JobRepository) JobsStatsNoCoreH( + ctx context.Context, + filter []*model.JobFilter, + groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { + + start := time.Now() + col := groupBy2column[*groupBy] + query := r.buildJobsStatsQuery(filter, col) + query, err := SecurityCheck(ctx, query) + if err != nil { + return nil, err + } + + rows, err := query.RunWith(r.DB).Query() + if err != nil { + log.Warn("Error while querying DB for job statistics") + return nil, err + } + + stats := make([]*model.JobsStatistics, 0, 100) + + for rows.Next() { + var id sql.NullString + var jobs, walltime sql.NullInt64 + if err := rows.Scan(&id, &jobs, &walltime); err != nil { + log.Warn("Error while scanning rows") + return nil, err + } + + if id.Valid { + if col == "job.user" { + name := r.getUserName(ctx, id.String) + stats = append(stats, + &model.JobsStatistics{ + ID: id.String, + Name: &name, + TotalJobs: int(jobs.Int64), + TotalWalltime: int(walltime.Int64)}) + } else { + stats = append(stats, + &model.JobsStatistics{ + ID: id.String, + TotalJobs: int(jobs.Int64), + TotalWalltime: int(walltime.Int64)}) + } + } + } + + log.Infof("Timer JobStatistics %s", time.Since(start)) + return stats, nil +} + +// without groupBy and without coreHours +func (r *JobRepository) JobsStatsPlainNoCoreH( + ctx context.Context, + filter []*model.JobFilter) ([]*model.JobsStatistics, error) { + + start := time.Now() + query := r.buildJobsStatsQuery(filter, "") + query, err := SecurityCheck(ctx, query) + if err != nil { + return nil, err + } + + row := query.RunWith(r.DB).QueryRow() + stats := make([]*model.JobsStatistics, 0, 1) + var jobs, walltime sql.NullInt64 + if err := row.Scan(&jobs, &walltime); err != nil { + log.Warn("Error while scanning rows") + return nil, err + } + + if jobs.Valid { + stats = append(stats, + &model.JobsStatistics{ + TotalJobs: int(jobs.Int64), + TotalWalltime: int(walltime.Int64)}) + } + + log.Infof("Timer JobStatistics %s", time.Since(start)) + return stats, nil +} + +// without groupBy and with coreHours +func (r *JobRepository) JobsStatsPlain( + ctx context.Context, + filter []*model.JobFilter) ([]*model.JobsStatistics, error) { + + start := time.Now() + query := r.buildJobsStatsQuery(filter, "") + query, err := SecurityCheck(ctx, query) + if err != nil { + return nil, err + } + + castType := r.getCastType() + var totalJobs, totalWalltime, totalCoreHours int64 + for _, cluster := range archive.Clusters { for _, subcluster := range cluster.SubClusters { - corehoursCol := fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as %s)", subcluster.SocketsPerNode, subcluster.CoresPerSocket, castType) - var rawQuery sq.SelectBuilder - if groupBy == nil { - rawQuery = sq.Select( - "''", - "COUNT(job.id)", - fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType), - corehoursCol, - ).From("job") - } else { - col := groupBy2column[*groupBy] - rawQuery = sq.Select( - col, - "COUNT(job.id)", - fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType), - corehoursCol, - ).From("job").GroupBy(col) - } - rawQuery = rawQuery. - Where("job.cluster = ?", cluster.Name). + scQuery := query.Column(fmt.Sprintf( + "CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as %s)", + subcluster.SocketsPerNode, subcluster.CoresPerSocket, castType)) + scQuery = scQuery.Where("job.cluster = ?", cluster.Name). Where("job.subcluster = ?", subcluster.Name) - query, qerr := SecurityCheck(ctx, rawQuery) - - if qerr != nil { - return nil, qerr + row := scQuery.RunWith(r.DB).QueryRow() + var jobs, walltime, corehours sql.NullInt64 + if err := row.Scan(&jobs, &walltime, &corehours); err != nil { + log.Warn("Error while scanning rows") + return nil, err } - for _, f := range filter { - query = BuildWhereClause(f, query) + if jobs.Valid { + totalJobs += jobs.Int64 + totalWalltime += walltime.Int64 + totalCoreHours += corehours.Int64 } + } + } + stats := make([]*model.JobsStatistics, 0, 1) + stats = append(stats, + &model.JobsStatistics{ + TotalJobs: int(totalJobs), + TotalWalltime: int(totalWalltime), + TotalCoreHours: int(totalCoreHours)}) - rows, err := query.RunWith(r.DB).Query() + log.Infof("Timer JobStatistics %s", time.Since(start)) + return stats, nil +} + +// with groupBy and with coreHours +func (r *JobRepository) JobsStats( + ctx context.Context, + filter []*model.JobFilter, + groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { + + start := time.Now() + + stats := map[string]*model.JobsStatistics{} + col := groupBy2column[*groupBy] + query := r.buildJobsStatsQuery(filter, col) + query, err := SecurityCheck(ctx, query) + if err != nil { + return nil, err + } + + castType := r.getCastType() + + for _, cluster := range archive.Clusters { + for _, subcluster := range cluster.SubClusters { + + scQuery := query.Column(fmt.Sprintf( + "CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as %s)", + subcluster.SocketsPerNode, subcluster.CoresPerSocket, castType)) + + scQuery = scQuery.Where("job.cluster = ?", cluster.Name). + Where("job.subcluster = ?", subcluster.Name) + + rows, err := scQuery.RunWith(r.DB).Query() if err != nil { log.Warn("Error while querying DB for job statistics") return nil, err @@ -93,11 +250,20 @@ func (r *JobRepository) JobsStatistics(ctx context.Context, return nil, err } - if id.Valid { - if s, ok := stats[id.String]; ok { - s.TotalJobs += int(jobs.Int64) - s.TotalWalltime += int(walltime.Int64) - s.TotalCoreHours += int(corehours.Int64) + if s, ok := stats[id.String]; ok { + s.TotalJobs += int(jobs.Int64) + s.TotalWalltime += int(walltime.Int64) + s.TotalCoreHours += int(corehours.Int64) + } else { + if col == "job.user" { + name := r.getUserName(ctx, id.String) + stats[id.String] = &model.JobsStatistics{ + ID: id.String, + Name: &name, + TotalJobs: int(jobs.Int64), + TotalWalltime: int(walltime.Int64), + TotalCoreHours: int(corehours.Int64), + } } else { stats[id.String] = &model.JobsStatistics{ ID: id.String, @@ -108,120 +274,50 @@ func (r *JobRepository) JobsStatistics(ctx context.Context, } } } - - } - } - - if groupBy == nil { - - query := sq.Select("COUNT(job.id)").From("job").Where("job.duration < ?", config.Keys.ShortRunningJobsDuration) - query, qerr := SecurityCheck(ctx, query) - - if qerr != nil { - return nil, qerr - } - - for _, f := range filter { - query = BuildWhereClause(f, query) - } - if err := query.RunWith(r.DB).QueryRow().Scan(&(stats[""].ShortJobs)); err != nil { - log.Warn("Error while scanning rows for short job stats") - return nil, err - } - } else { - col := groupBy2column[*groupBy] - - query := sq.Select(col, "COUNT(job.id)").From("job").Where("job.duration < ?", config.Keys.ShortRunningJobsDuration) - - query, qerr := SecurityCheck(ctx, query) - - if qerr != nil { - return nil, qerr - } - - for _, f := range filter { - query = BuildWhereClause(f, query) - } - rows, err := query.RunWith(r.DB).Query() - if err != nil { - log.Warn("Error while querying jobs for short jobs") - return nil, err - } - - for rows.Next() { - var id sql.NullString - var shortJobs sql.NullInt64 - if err := rows.Scan(&id, &shortJobs); err != nil { - log.Warn("Error while scanning rows for short jobs") - return nil, err - } - - if id.Valid { - stats[id.String].ShortJobs = int(shortJobs.Int64) - } - } - - if col == "job.user" { - for id := range stats { - emptyDash := "-" - user := auth.GetUser(ctx) - name, _ := r.FindColumnValue(user, id, "user", "name", "username", false) - if name != "" { - stats[id].Name = &name - } else { - stats[id].Name = &emptyDash - } - } - } - } - - // Calculating the histogram data is expensive, so only do it if needed. - // An explicit resolver can not be used because we need to know the filters. - histogramsNeeded := false - fields := graphql.CollectFieldsCtx(ctx, nil) - for _, col := range fields { - if col.Name == "histDuration" || col.Name == "histNumNodes" { - histogramsNeeded = true } } res := make([]*model.JobsStatistics, 0, len(stats)) for _, stat := range stats { res = append(res, stat) - id, col := "", "" - if groupBy != nil { - id = stat.ID - col = groupBy2column[*groupBy] - } - - if histogramsNeeded { - var err error - value := fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType) - stat.HistDuration, err = r.jobsStatisticsHistogram(ctx, value, filter, id, col) - if err != nil { - log.Warn("Error while loading job statistics histogram: running jobs") - return nil, err - } - - stat.HistNumNodes, err = r.jobsStatisticsHistogram(ctx, "job.num_nodes as value", filter, id, col) - if err != nil { - log.Warn("Error while loading job statistics histogram: num nodes") - return nil, err - } - } } log.Infof("Timer JobStatistics %s", time.Since(start)) return res, nil } -// `value` must be the column grouped by, but renamed to "value". `id` and `col` can optionally be used -// to add a condition to the query of the kind " = ". -func (r *JobRepository) jobsStatisticsHistogram(ctx context.Context, - value string, filters []*model.JobFilter, id, col string) ([]*model.HistoPoint, error) { +func (r *JobRepository) AddHistograms( + ctx context.Context, + filter []*model.JobFilter, + stat *model.JobsStatistics) (*model.JobsStatistics, error) { + + castType := r.getCastType() + var err error + value := fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType) + stat.HistDuration, err = r.jobsStatisticsHistogram(ctx, value, filter) + if err != nil { + log.Warn("Error while loading job statistics histogram: running jobs") + return nil, err + } + + stat.HistNumNodes, err = r.jobsStatisticsHistogram(ctx, "job.num_nodes as value", filter) + if err != nil { + log.Warn("Error while loading job statistics histogram: num nodes") + return nil, err + } + + return stat, nil +} + +// `value` must be the column grouped by, but renamed to "value" +func (r *JobRepository) jobsStatisticsHistogram( + ctx context.Context, + value string, + filters []*model.JobFilter) ([]*model.HistoPoint, error) { start := time.Now() - query, qerr := SecurityCheck(ctx, sq.Select(value, "COUNT(job.id) AS count").From("job")) + query, qerr := SecurityCheck(ctx, + sq.Select(value, "COUNT(job.id) AS count").From("job")) if qerr != nil { return nil, qerr @@ -231,10 +327,6 @@ func (r *JobRepository) jobsStatisticsHistogram(ctx context.Context, query = BuildWhereClause(f, query) } - if len(id) != 0 && len(col) != 0 { - query = query.Where(col+" = ?", id) - } - rows, err := query.GroupBy("value").RunWith(r.DB).Query() if err != nil { log.Error("Error while running query") diff --git a/internal/repository/stats_test.go b/internal/repository/stats_test.go new file mode 100644 index 0000000..6ed485b --- /dev/null +++ b/internal/repository/stats_test.go @@ -0,0 +1,24 @@ +// Copyright (C) 2022 NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package repository + +import ( + "fmt" + "testing" +) + +func TestBuildJobStatsQuery(t *testing.T) { + r := setup(t) + q := r.buildJobsStatsQuery(nil, "USER") + + sql, _, err := q.ToSql() + noErr(t, err) + + fmt.Printf("SQL: %s\n", sql) + + if 1 != 5 { + t.Errorf("wrong summary for diagnostic 3\ngot: %d \nwant: 1366", 5) + } +} From fb86ebdbcc1c018e9de5a9d90c6fba0fdf88abca Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Thu, 8 Jun 2023 06:18:19 +0200 Subject: [PATCH 04/12] Renaming --- internal/graph/schema.resolvers.go | 4 +- internal/repository/stats.go | 124 +++++++++++++++++++++++++++-- 2 files changed, 120 insertions(+), 8 deletions(-) diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 94d8727..c881a78 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -280,8 +280,8 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF var err error var stats []*model.JobsStatistics - if requireField(ctx, "TotalJobs") { - if requireField(ctx, "TotalCoreHours") { + if requireField(ctx, "totalJobs") { + if requireField(ctx, "totalCoreHours") { if groupBy == nil { stats, err = r.Repo.JobsStatsPlain(ctx, filter) } else { diff --git a/internal/repository/stats.go b/internal/repository/stats.go index 9ec646f..2197a79 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -11,6 +11,7 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/internal/auth" + "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" @@ -24,7 +25,36 @@ var groupBy2column = map[model.Aggregate]string{ model.AggregateCluster: "job.cluster", } -func (r *JobRepository) buildJobsStatsQuery( +func (r *JobRepository) buildCountQuery( + filter []*model.JobFilter, + kind string, + col string) sq.SelectBuilder { + + var query sq.SelectBuilder + + if col != "" { + // Scan columns: id, cnt + query = sq.Select(col, "COUNT(job.id)").From("job").GroupBy(col) + } else { + // Scan columns: cnt + query = sq.Select("COUNT(job.id)").From("job") + } + + switch kind { + case "running": + query = query.Where("job.job_state = ?", "running") + case "short": + query = query.Where("job.duration < ?", config.Keys.ShortRunningJobsDuration) + } + + for _, f := range filter { + query = BuildWhereClause(f, query) + } + + return query +} + +func (r *JobRepository) buildStatsQuery( filter []*model.JobFilter, col string) sq.SelectBuilder { @@ -83,7 +113,7 @@ func (r *JobRepository) JobsStatsNoCoreH( start := time.Now() col := groupBy2column[*groupBy] - query := r.buildJobsStatsQuery(filter, col) + query := r.buildStatsQuery(filter, col) query, err := SecurityCheck(ctx, query) if err != nil { return nil, err @@ -134,7 +164,7 @@ func (r *JobRepository) JobsStatsPlainNoCoreH( filter []*model.JobFilter) ([]*model.JobsStatistics, error) { start := time.Now() - query := r.buildJobsStatsQuery(filter, "") + query := r.buildStatsQuery(filter, "") query, err := SecurityCheck(ctx, query) if err != nil { return nil, err @@ -149,10 +179,21 @@ func (r *JobRepository) JobsStatsPlainNoCoreH( } if jobs.Valid { + query := r.buildCountQuery(filter, "short", "") + query, err := SecurityCheck(ctx, query) + if err != nil { + return nil, err + } + var cnt sql.NullInt64 + if err := query.RunWith(r.DB).QueryRow().Scan(&cnt); err != nil { + log.Warn("Error while scanning rows") + return nil, err + } stats = append(stats, &model.JobsStatistics{ TotalJobs: int(jobs.Int64), - TotalWalltime: int(walltime.Int64)}) + TotalWalltime: int(walltime.Int64), + ShortJobs: int(cnt.Int64)}) } log.Infof("Timer JobStatistics %s", time.Since(start)) @@ -165,7 +206,7 @@ func (r *JobRepository) JobsStatsPlain( filter []*model.JobFilter) ([]*model.JobsStatistics, error) { start := time.Now() - query := r.buildJobsStatsQuery(filter, "") + query := r.buildStatsQuery(filter, "") query, err := SecurityCheck(ctx, query) if err != nil { return nil, err @@ -218,7 +259,7 @@ func (r *JobRepository) JobsStats( stats := map[string]*model.JobsStatistics{} col := groupBy2column[*groupBy] - query := r.buildJobsStatsQuery(filter, col) + query := r.buildStatsQuery(filter, col) query, err := SecurityCheck(ctx, query) if err != nil { return nil, err @@ -286,6 +327,77 @@ func (r *JobRepository) JobsStats( return res, nil } +type jobCountResult struct { + id string + shortJobs int + totalJobs int + runningJobs int +} + +func (r *JobRepository) JobCounts( + ctx context.Context, + filter []*model.JobFilter) ([]*model.JobsStatistics, error) { + + counts := make(map[string]jobCountResult) + start := time.Now() + query := r.buildCountQuery(filter, "short", "cluster") + query, err := SecurityCheck(ctx, query) + if err != nil { + return nil, err + } + rows, err := query.RunWith(r.DB).Query() + if err != nil { + log.Warn("Error while querying DB for job statistics") + return nil, err + } + + for rows.Next() { + var id sql.NullString + var cnt sql.NullInt64 + if err := rows.Scan(&id, &cnt); err != nil { + log.Warn("Error while scanning rows") + return nil, err + } + if id.Valid { + counts[id.String] = jobCountResult{id: id.String, shortJobs: int(cnt.Int64)} + } + } + + query = r.buildCountQuery(filter, "running", "cluster") + query, err = SecurityCheck(ctx, query) + if err != nil { + return nil, err + } + rows, err = query.RunWith(r.DB).Query() + if err != nil { + log.Warn("Error while querying DB for job statistics") + return nil, err + } + + for rows.Next() { + var id sql.NullString + var cnt sql.NullInt64 + if err := rows.Scan(&id, &cnt); err != nil { + log.Warn("Error while scanning rows") + return nil, err + } + if id.Valid { + counts[id.String].runningJobs = int(cnt.Int64) + } + } + + stats := make([]*model.JobsStatistics, 0, 20) + if id.Valid { + stats = append(stats, + &model.JobsStatistics{ + ID: id.String, + TotalJobs: int(jobs.Int64), + RunningJobs: int(walltime.Int64)}) + } + log.Infof("Timer JobStatistics %s", time.Since(start)) + return stats, nil +} + func (r *JobRepository) AddHistograms( ctx context.Context, filter []*model.JobFilter, From 5ba6f0ed3aecb82ba3beb5dcd7b05006b0b139e4 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 9 Jun 2023 09:09:41 +0200 Subject: [PATCH 05/12] Refactor and adapt to new API --- api/schema.graphqls | 7 +- internal/graph/generated/generated.go | 128 ++++++++++++++- internal/graph/model/models_gen.go | 2 + internal/graph/schema.resolvers.go | 64 ++------ internal/graph/{stats.go => util.go} | 13 ++ internal/repository/stats.go | 223 ++++++-------------------- internal/routerConfig/routes.go | 42 +---- pkg/schema/job.go | 17 +- web/templates/home.tmpl | 21 ++- 9 files changed, 239 insertions(+), 278 deletions(-) rename internal/graph/{stats.go => util.go} (95%) diff --git a/api/schema.graphqls b/api/schema.graphqls index b7c16ce..71a5373 100644 --- a/api/schema.graphqls +++ b/api/schema.graphqls @@ -286,10 +286,13 @@ type HistoPoint { type JobsStatistics { id: ID! # If `groupBy` was used, ID of the user/project/cluster name: String! # if User-Statistics: Given Name of Account (ID) Owner - totalJobs: Int! # Number of jobs that matched - shortJobs: Int! # Number of jobs with a duration of less than 2 minutes + totalJobs: Int! # Number of jobs + runningJobs: Int! # Number of running jobs + shortJobs: Int! # Number of jobs with a duration of less than duration totalWalltime: Int! # Sum of the duration of all matched jobs in hours + totalNodeHours: Int! # Sum of the node hours of all matched jobs totalCoreHours: Int! # Sum of the core hours of all matched jobs + totalAccHours: Int! # Sum of the gpu hours of all matched jobs histDuration: [HistoPoint!]! # value: hour, count: number of jobs with a rounded duration of value histNumNodes: [HistoPoint!]! # value: number of nodes, count: number of jobs with that number of nodes } diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index 907526e..0ef1fcb 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -143,9 +143,11 @@ type ComplexityRoot struct { HistNumNodes func(childComplexity int) int ID func(childComplexity int) int Name func(childComplexity int) int + RunningJobs func(childComplexity int) int ShortJobs func(childComplexity int) int TotalCoreHours func(childComplexity int) int TotalJobs func(childComplexity int) int + TotalNodeHours func(childComplexity int) int TotalWalltime func(childComplexity int) int } @@ -731,6 +733,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.JobsStatistics.Name(childComplexity), true + case "JobsStatistics.runningJobs": + if e.complexity.JobsStatistics.RunningJobs == nil { + break + } + + return e.complexity.JobsStatistics.RunningJobs(childComplexity), true + case "JobsStatistics.shortJobs": if e.complexity.JobsStatistics.ShortJobs == nil { break @@ -752,6 +761,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.JobsStatistics.TotalJobs(childComplexity), true + case "JobsStatistics.totalNodeHours": + if e.complexity.JobsStatistics.TotalNodeHours == nil { + break + } + + return e.complexity.JobsStatistics.TotalNodeHours(childComplexity), true + case "JobsStatistics.totalWalltime": if e.complexity.JobsStatistics.TotalWalltime == nil { break @@ -1764,9 +1780,11 @@ type HistoPoint { type JobsStatistics { id: ID! # If ` + "`" + `groupBy` + "`" + ` was used, ID of the user/project/cluster name: String! # if User-Statistics: Given Name of Account (ID) Owner - totalJobs: Int! # Number of jobs that matched - shortJobs: Int! # Number of jobs with a duration of less than 2 minutes + totalJobs: Int! # Number of jobs + runningJobs: Int! # Number of running jobs + shortJobs: Int! # Number of jobs with a duration of less than duration totalWalltime: Int! # Sum of the duration of all matched jobs in hours + totalNodeHours: Int! # Sum of the node hours of all matched jobs totalCoreHours: Int! # Sum of the core hours of all matched jobs histDuration: [HistoPoint!]! # value: hour, count: number of jobs with a rounded duration of value histNumNodes: [HistoPoint!]! # value: number of nodes, count: number of jobs with that number of nodes @@ -4884,6 +4902,50 @@ func (ec *executionContext) fieldContext_JobsStatistics_totalJobs(ctx context.Co return fc, nil } +func (ec *executionContext) _JobsStatistics_runningJobs(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_JobsStatistics_runningJobs(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.RunningJobs, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(int) + fc.Result = res + return ec.marshalNInt2int(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_JobsStatistics_runningJobs(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "JobsStatistics", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Int does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _JobsStatistics_shortJobs(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) { fc, err := ec.fieldContext_JobsStatistics_shortJobs(ctx, field) if err != nil { @@ -4972,6 +5034,50 @@ func (ec *executionContext) fieldContext_JobsStatistics_totalWalltime(ctx contex return fc, nil } +func (ec *executionContext) _JobsStatistics_totalNodeHours(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_JobsStatistics_totalNodeHours(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.TotalNodeHours, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(int) + fc.Result = res + return ec.marshalNInt2int(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_JobsStatistics_totalNodeHours(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "JobsStatistics", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Int does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _JobsStatistics_totalCoreHours(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) { fc, err := ec.fieldContext_JobsStatistics_totalCoreHours(ctx, field) if err != nil { @@ -6867,10 +6973,14 @@ func (ec *executionContext) fieldContext_Query_jobsStatistics(ctx context.Contex return ec.fieldContext_JobsStatistics_name(ctx, field) case "totalJobs": return ec.fieldContext_JobsStatistics_totalJobs(ctx, field) + case "runningJobs": + return ec.fieldContext_JobsStatistics_runningJobs(ctx, field) case "shortJobs": return ec.fieldContext_JobsStatistics_shortJobs(ctx, field) case "totalWalltime": return ec.fieldContext_JobsStatistics_totalWalltime(ctx, field) + case "totalNodeHours": + return ec.fieldContext_JobsStatistics_totalNodeHours(ctx, field) case "totalCoreHours": return ec.fieldContext_JobsStatistics_totalCoreHours(ctx, field) case "histDuration": @@ -12062,6 +12172,13 @@ func (ec *executionContext) _JobsStatistics(ctx context.Context, sel ast.Selecti out.Values[i] = ec._JobsStatistics_totalJobs(ctx, field, obj) + if out.Values[i] == graphql.Null { + invalids++ + } + case "runningJobs": + + out.Values[i] = ec._JobsStatistics_runningJobs(ctx, field, obj) + if out.Values[i] == graphql.Null { invalids++ } @@ -12076,6 +12193,13 @@ func (ec *executionContext) _JobsStatistics(ctx context.Context, sel ast.Selecti out.Values[i] = ec._JobsStatistics_totalWalltime(ctx, field, obj) + if out.Values[i] == graphql.Null { + invalids++ + } + case "totalNodeHours": + + out.Values[i] = ec._JobsStatistics_totalNodeHours(ctx, field, obj) + if out.Values[i] == graphql.Null { invalids++ } diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index 552204e..4538098 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -90,8 +90,10 @@ type JobsStatistics struct { ID string `json:"id"` Name string `json:"name"` TotalJobs int `json:"totalJobs"` + RunningJobs int `json:"runningJobs"` ShortJobs int `json:"shortJobs"` TotalWalltime int `json:"totalWalltime"` + TotalNodeHours int `json:"totalNodeHours"` TotalCoreHours int `json:"totalCoreHours"` HistDuration []*HistoPoint `json:"histDuration"` HistNumNodes []*HistoPoint `json:"histNumNodes"` diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 91deb6d..de81bf7 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -11,7 +11,6 @@ import ( "strconv" "time" - "github.com/99designs/gqlgen/graphql" "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/graph/generated" "github.com/ClusterCockpit/cc-backend/internal/graph/model" @@ -234,20 +233,12 @@ func (r *queryResolver) JobMetrics(ctx context.Context, id string, metrics []str } // JobsFootprints is the resolver for the jobsFootprints field. -func (r *queryResolver) JobsFootprints( - ctx context.Context, - filter []*model.JobFilter, - metrics []string) (*model.Footprints, error) { - +func (r *queryResolver) JobsFootprints(ctx context.Context, filter []*model.JobFilter, metrics []string) (*model.Footprints, error) { return r.jobsFootprints(ctx, filter, metrics) } // Jobs is the resolver for the jobs field. -func (r *queryResolver) Jobs( - ctx context.Context, - filter []*model.JobFilter, - page *model.PageRequest, - order *model.OrderByInput) (*model.JobResultList, error) { +func (r *queryResolver) Jobs(ctx context.Context, filter []*model.JobFilter, page *model.PageRequest, order *model.OrderByInput) (*model.JobResultList, error) { if page == nil { page = &model.PageRequest{ ItemsPerPage: 50, @@ -276,23 +267,26 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF var stats []*model.JobsStatistics if requireField(ctx, "totalJobs") { - if requireField(ctx, "totalCoreHours") { - if groupBy == nil { - stats, err = r.Repo.JobsStatsPlain(ctx, filter) - } else { - stats, err = r.Repo.JobsStats(ctx, filter, groupBy) - } + if groupBy == nil { + stats, err = r.Repo.JobsStats(ctx, filter) } else { - if groupBy == nil { - stats, err = r.Repo.JobsStatsPlainNoCoreH(ctx, filter) - } else { - stats, err = r.Repo.JobsStatsNoCoreH(ctx, filter, groupBy) - } + stats, err = r.Repo.JobsStatsGrouped(ctx, filter, groupBy) } } else { stats = make([]*model.JobsStatistics, 0, 1) } + if groupBy != nil { + if requireField(ctx, "shortJobs") { + stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "short") + } + if requireField(ctx, "RunningJobs") { + stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "running") + } + } else { + return nil, errors.New("Job counts only implemented with groupBy argument") + } + if err != nil { return nil, err } @@ -312,13 +306,7 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF } // JobsCount is the resolver for the jobsCount field. -func (r *queryResolver) JobsCount( - ctx context.Context, - filter []*model.JobFilter, - groupBy model.Aggregate, - weight *model.Weights, - limit *int) ([]*model.Count, error) { - +func (r *queryResolver) JobsCount(ctx context.Context, filter []*model.JobFilter, groupBy model.Aggregate, weight *model.Weights, limit *int) ([]*model.Count, error) { counts, err := r.Repo.CountGroupedJobs(ctx, groupBy, filter, weight, limit) if err != nil { log.Warn("Error while counting grouped jobs") @@ -412,21 +400,3 @@ type jobResolver struct{ *Resolver } type mutationResolver struct{ *Resolver } type queryResolver struct{ *Resolver } type subClusterResolver struct{ *Resolver } - -// !!! WARNING !!! -// The code below was going to be deleted when updating resolvers. It has been copied here so you have -// one last chance to move it out of harms way if you want. There are two reasons this happens: -// - When renaming or deleting a resolver the old code will be put in here. You can safely delete -// it when you're done. -// - You have helper methods in this file. Move them out to keep these resolver files clean. -func requireField(ctx context.Context, name string) bool { - fields := graphql.CollectAllFields(ctx) - - for _, f := range fields { - if f == name { - return true - } - } - - return false -} diff --git a/internal/graph/stats.go b/internal/graph/util.go similarity index 95% rename from internal/graph/stats.go rename to internal/graph/util.go index 46aac11..c9423e1 100644 --- a/internal/graph/stats.go +++ b/internal/graph/util.go @@ -10,6 +10,7 @@ import ( "fmt" "math" + "github.com/99designs/gqlgen/graphql" "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/pkg/log" @@ -132,3 +133,15 @@ func (r *queryResolver) jobsFootprints(ctx context.Context, filter []*model.JobF Metrics: res, }, nil } + +func requireField(ctx context.Context, name string) bool { + fields := graphql.CollectAllFields(ctx) + + for _, f := range fields { + if f == name { + return true + } + } + + return false +} diff --git a/internal/repository/stats.go b/internal/repository/stats.go index 2197a79..a54c587 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -13,7 +13,6 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/model" - "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" sq "github.com/Masterminds/squirrel" ) @@ -62,14 +61,18 @@ func (r *JobRepository) buildStatsQuery( castType := r.getCastType() if col != "" { - // Scan columns: id, totalJobs, totalWalltime + // Scan columns: id, totalJobs, totalWalltime, totalNodeHours, totalCoreHours query = sq.Select(col, "COUNT(job.id)", fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType), + fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s)", castType), + fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s)", castType), ).From("job").GroupBy(col) } else { - // Scan columns: totalJobs, totalWalltime + // Scan columns: totalJobs, totalWalltime, totalNodeHours, totalCoreHours query = sq.Select("COUNT(job.id)", fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType), + fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s)", castType), + fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s)", castType), ).From("job") } @@ -105,8 +108,7 @@ func (r *JobRepository) getCastType() string { return castType } -// with groupBy and without coreHours -func (r *JobRepository) JobsStatsNoCoreH( +func (r *JobRepository) JobsStatsGrouped( ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { @@ -129,8 +131,8 @@ func (r *JobRepository) JobsStatsNoCoreH( for rows.Next() { var id sql.NullString - var jobs, walltime sql.NullInt64 - if err := rows.Scan(&id, &jobs, &walltime); err != nil { + var jobs, walltime, nodeHours, coreHours sql.NullInt64 + if err := rows.Scan(&id, &jobs, &walltime, &nodeHours, &coreHours); err != nil { log.Warn("Error while scanning rows") return nil, err } @@ -141,7 +143,7 @@ func (r *JobRepository) JobsStatsNoCoreH( stats = append(stats, &model.JobsStatistics{ ID: id.String, - Name: &name, + Name: name, TotalJobs: int(jobs.Int64), TotalWalltime: int(walltime.Int64)}) } else { @@ -158,8 +160,7 @@ func (r *JobRepository) JobsStatsNoCoreH( return stats, nil } -// without groupBy and without coreHours -func (r *JobRepository) JobsStatsPlainNoCoreH( +func (r *JobRepository) JobsStats( ctx context.Context, filter []*model.JobFilter) ([]*model.JobsStatistics, error) { @@ -172,175 +173,32 @@ func (r *JobRepository) JobsStatsPlainNoCoreH( row := query.RunWith(r.DB).QueryRow() stats := make([]*model.JobsStatistics, 0, 1) - var jobs, walltime sql.NullInt64 - if err := row.Scan(&jobs, &walltime); err != nil { + + var jobs, walltime, nodeHours, coreHours sql.NullInt64 + if err := row.Scan(&jobs, &walltime, &nodeHours, &coreHours); err != nil { log.Warn("Error while scanning rows") return nil, err } if jobs.Valid { - query := r.buildCountQuery(filter, "short", "") - query, err := SecurityCheck(ctx, query) - if err != nil { - return nil, err - } - var cnt sql.NullInt64 - if err := query.RunWith(r.DB).QueryRow().Scan(&cnt); err != nil { - log.Warn("Error while scanning rows") - return nil, err - } stats = append(stats, &model.JobsStatistics{ TotalJobs: int(jobs.Int64), - TotalWalltime: int(walltime.Int64), - ShortJobs: int(cnt.Int64)}) + TotalWalltime: int(walltime.Int64)}) } log.Infof("Timer JobStatistics %s", time.Since(start)) return stats, nil } -// without groupBy and with coreHours -func (r *JobRepository) JobsStatsPlain( - ctx context.Context, - filter []*model.JobFilter) ([]*model.JobsStatistics, error) { - - start := time.Now() - query := r.buildStatsQuery(filter, "") - query, err := SecurityCheck(ctx, query) - if err != nil { - return nil, err - } - - castType := r.getCastType() - var totalJobs, totalWalltime, totalCoreHours int64 - - for _, cluster := range archive.Clusters { - for _, subcluster := range cluster.SubClusters { - - scQuery := query.Column(fmt.Sprintf( - "CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as %s)", - subcluster.SocketsPerNode, subcluster.CoresPerSocket, castType)) - scQuery = scQuery.Where("job.cluster = ?", cluster.Name). - Where("job.subcluster = ?", subcluster.Name) - - row := scQuery.RunWith(r.DB).QueryRow() - var jobs, walltime, corehours sql.NullInt64 - if err := row.Scan(&jobs, &walltime, &corehours); err != nil { - log.Warn("Error while scanning rows") - return nil, err - } - - if jobs.Valid { - totalJobs += jobs.Int64 - totalWalltime += walltime.Int64 - totalCoreHours += corehours.Int64 - } - } - } - stats := make([]*model.JobsStatistics, 0, 1) - stats = append(stats, - &model.JobsStatistics{ - TotalJobs: int(totalJobs), - TotalWalltime: int(totalWalltime), - TotalCoreHours: int(totalCoreHours)}) - - log.Infof("Timer JobStatistics %s", time.Since(start)) - return stats, nil -} - -// with groupBy and with coreHours -func (r *JobRepository) JobsStats( +func (r *JobRepository) JobCountGrouped( ctx context.Context, filter []*model.JobFilter, groupBy *model.Aggregate) ([]*model.JobsStatistics, error) { start := time.Now() - - stats := map[string]*model.JobsStatistics{} col := groupBy2column[*groupBy] - query := r.buildStatsQuery(filter, col) - query, err := SecurityCheck(ctx, query) - if err != nil { - return nil, err - } - - castType := r.getCastType() - - for _, cluster := range archive.Clusters { - for _, subcluster := range cluster.SubClusters { - - scQuery := query.Column(fmt.Sprintf( - "CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as %s)", - subcluster.SocketsPerNode, subcluster.CoresPerSocket, castType)) - - scQuery = scQuery.Where("job.cluster = ?", cluster.Name). - Where("job.subcluster = ?", subcluster.Name) - - rows, err := scQuery.RunWith(r.DB).Query() - if err != nil { - log.Warn("Error while querying DB for job statistics") - return nil, err - } - - for rows.Next() { - var id sql.NullString - var jobs, walltime, corehours sql.NullInt64 - if err := rows.Scan(&id, &jobs, &walltime, &corehours); err != nil { - log.Warn("Error while scanning rows") - return nil, err - } - - if s, ok := stats[id.String]; ok { - s.TotalJobs += int(jobs.Int64) - s.TotalWalltime += int(walltime.Int64) - s.TotalCoreHours += int(corehours.Int64) - } else { - if col == "job.user" { - name := r.getUserName(ctx, id.String) - stats[id.String] = &model.JobsStatistics{ - ID: id.String, - Name: &name, - TotalJobs: int(jobs.Int64), - TotalWalltime: int(walltime.Int64), - TotalCoreHours: int(corehours.Int64), - } - } else { - stats[id.String] = &model.JobsStatistics{ - ID: id.String, - TotalJobs: int(jobs.Int64), - TotalWalltime: int(walltime.Int64), - TotalCoreHours: int(corehours.Int64), - } - } - } - } - } - } - - res := make([]*model.JobsStatistics, 0, len(stats)) - for _, stat := range stats { - res = append(res, stat) - } - - log.Infof("Timer JobStatistics %s", time.Since(start)) - return res, nil -} - -type jobCountResult struct { - id string - shortJobs int - totalJobs int - runningJobs int -} - -func (r *JobRepository) JobCounts( - ctx context.Context, - filter []*model.JobFilter) ([]*model.JobsStatistics, error) { - - counts := make(map[string]jobCountResult) - start := time.Now() - query := r.buildCountQuery(filter, "short", "cluster") + query := r.buildCountQuery(filter, "", col) query, err := SecurityCheck(ctx, query) if err != nil { return nil, err @@ -351,6 +209,8 @@ func (r *JobRepository) JobCounts( return nil, err } + stats := make([]*model.JobsStatistics, 0, 100) + for rows.Next() { var id sql.NullString var cnt sql.NullInt64 @@ -359,21 +219,39 @@ func (r *JobRepository) JobCounts( return nil, err } if id.Valid { - counts[id.String] = jobCountResult{id: id.String, shortJobs: int(cnt.Int64)} + stats = append(stats, + &model.JobsStatistics{ + ID: id.String, + TotalJobs: int(cnt.Int64)}) } } - query = r.buildCountQuery(filter, "running", "cluster") - query, err = SecurityCheck(ctx, query) + log.Infof("Timer JobStatistics %s", time.Since(start)) + return stats, nil +} + +func (r *JobRepository) AddJobCountGrouped( + ctx context.Context, + filter []*model.JobFilter, + groupBy *model.Aggregate, + stats []*model.JobsStatistics, + kind string) ([]*model.JobsStatistics, error) { + + start := time.Now() + col := groupBy2column[*groupBy] + query := r.buildCountQuery(filter, kind, col) + query, err := SecurityCheck(ctx, query) if err != nil { return nil, err } - rows, err = query.RunWith(r.DB).Query() + rows, err := query.RunWith(r.DB).Query() if err != nil { log.Warn("Error while querying DB for job statistics") return nil, err } + counts := make(map[string]int) + for rows.Next() { var id sql.NullString var cnt sql.NullInt64 @@ -382,18 +260,21 @@ func (r *JobRepository) JobCounts( return nil, err } if id.Valid { - counts[id.String].runningJobs = int(cnt.Int64) + counts[id.String] = int(cnt.Int64) } } - stats := make([]*model.JobsStatistics, 0, 20) - if id.Valid { - stats = append(stats, - &model.JobsStatistics{ - ID: id.String, - TotalJobs: int(jobs.Int64), - RunningJobs: int(walltime.Int64)}) + switch kind { + case "running": + for _, s := range stats { + s.RunningJobs = counts[s.ID] + } + case "short": + for _, s := range stats { + s.ShortJobs = counts[s.ID] + } } + log.Infof("Timer JobStatistics %s", time.Since(start)) return stats, nil } diff --git a/internal/routerConfig/routes.go b/internal/routerConfig/routes.go index 87d2c0e..2aa3f05 100644 --- a/internal/routerConfig/routes.go +++ b/internal/routerConfig/routes.go @@ -14,12 +14,9 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/api" "github.com/ClusterCockpit/cc-backend/internal/auth" - "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph/model" "github.com/ClusterCockpit/cc-backend/internal/repository" - "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" - "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/ClusterCockpit/cc-backend/web" "github.com/gorilla/mux" ) @@ -50,47 +47,20 @@ var routes []Route = []Route{ } func setupHomeRoute(i InfoType, r *http.Request) InfoType { - type cluster struct { - Name string - RunningJobs int - TotalJobs int - RecentShortJobs int - } jobRepo := repository.GetJobRepository() + groupBy := model.AggregateCluster - runningJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{ - State: []schema.JobState{schema.JobStateRunning}, - }}, nil, nil) + stats, err := jobRepo.JobCountGrouped(r.Context(), nil, &groupBy) if err != nil { log.Warnf("failed to count jobs: %s", err.Error()) - runningJobs = map[string]int{} - } - totalJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, nil, nil, nil) - if err != nil { - log.Warnf("failed to count jobs: %s", err.Error()) - totalJobs = map[string]int{} - } - from := time.Now().Add(-24 * time.Hour) - recentShortJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{ - StartTime: &schema.TimeRange{From: &from, To: nil}, - Duration: &schema.IntRange{From: 0, To: config.Keys.ShortRunningJobsDuration}, - }}, nil, nil) - if err != nil { - log.Warnf("failed to count jobs: %s", err.Error()) - recentShortJobs = map[string]int{} } - clusters := make([]cluster, 0) - for _, c := range archive.Clusters { - clusters = append(clusters, cluster{ - Name: c.Name, - RunningJobs: runningJobs[c.Name], - TotalJobs: totalJobs[c.Name], - RecentShortJobs: recentShortJobs[c.Name], - }) + stats, err = jobRepo.AddJobCountGrouped(r.Context(), nil, &groupBy, stats, "running") + if err != nil { + log.Warnf("failed to count running jobs: %s", err.Error()) } - i["clusters"] = clusters + i["clusters"] = stats return i } diff --git a/pkg/schema/job.go b/pkg/schema/job.go index 3f75551..d967dd0 100644 --- a/pkg/schema/job.go +++ b/pkg/schema/job.go @@ -17,14 +17,15 @@ import ( type BaseJob struct { // The unique identifier of a job - JobID int64 `json:"jobId" db:"job_id" example:"123000"` - User string `json:"user" db:"user" example:"abcd100h"` // The unique identifier of a user - Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project - Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster - SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster - Partition string `json:"partition,omitempty" db:"partition" example:"main"` // The Slurm partition to which the job was submitted - ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"` // The unique identifier of an array job - NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0) + JobID int64 `json:"jobId" db:"job_id" example:"123000"` + User string `json:"user" db:"user" example:"abcd100h"` // The unique identifier of a user + Project string `json:"project" db:"project" example:"abcd200"` // The unique identifier of a project + Cluster string `json:"cluster" db:"cluster" example:"fritz"` // The unique identifier of a cluster + SubCluster string `json:"subCluster" db:"subcluster" example:"main"` // The unique identifier of a sub cluster + Partition string `json:"partition,omitempty" db:"partition" example:"main"` // The Slurm partition to which the job was submitted + ArrayJobId int64 `json:"arrayJobId,omitempty" db:"array_job_id" example:"123000"` // The unique identifier of an array job + NumNodes int32 `json:"numNodes" db:"num_nodes" example:"2" minimum:"1"` // Number of nodes used (Min > 0) + // NumCores int32 `json:"numCores" db:"num_cores" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0) NumHWThreads int32 `json:"numHwthreads,omitempty" db:"num_hwthreads" example:"20" minimum:"1"` // Number of HWThreads used (Min > 0) NumAcc int32 `json:"numAcc,omitempty" db:"num_acc" example:"2" minimum:"1"` // Number of accelerators used (Min > 0) Exclusive int32 `json:"exclusive" db:"exclusive" example:"1" minimum:"0" maximum:"2"` // Specifies how nodes are shared: 0 - Shared among multiple jobs of multiple users, 1 - Job exclusive (Default), 2 - Shared among multiple jobs of same user diff --git a/web/templates/home.tmpl b/web/templates/home.tmpl index 92c80f9..ff49e1e 100644 --- a/web/templates/home.tmpl +++ b/web/templates/home.tmpl @@ -6,9 +6,8 @@ Name - Running Jobs (short ones not listed) + Running Jobs Total Jobs - Short Jobs in past 24h {{if .User.HasRole .Roles.admin}} System View Analysis View @@ -19,21 +18,19 @@ {{if .User.HasRole .Roles.admin}} {{range .Infos.clusters}} - {{.Name}} - {{.RunningJobs}} jobs - {{.TotalJobs}} jobs - {{.RecentShortJobs}} - System View - Analysis View + {{.ID}} + {{.RunningJobs}} jobs + {{.TotalJobs}} jobs + System View + Analysis View {{end}} {{else}} {{range .Infos.clusters}} - {{.Name}} - {{.RunningJobs}} jobs - {{.TotalJobs}} jobs - {{.RecentShortJobs}} + {{.ID}} + {{.RunningJobs}} jobs + {{.TotalJobs}} jobs {{end}} {{end}} From 616095fe664f59f9156311b8ccee0210300edef0 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 9 Jun 2023 11:29:07 +0200 Subject: [PATCH 06/12] Add additional job stats, fix test --- internal/graph/generated/generated.go | 62 +++++++++++++++++++++++++++ internal/graph/model/models_gen.go | 1 + internal/repository/stats.go | 55 +++++++++++++++++------- internal/repository/stats_test.go | 5 +-- 4 files changed, 104 insertions(+), 19 deletions(-) diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index 0ef1fcb..9d5f7d9 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -145,6 +145,7 @@ type ComplexityRoot struct { Name func(childComplexity int) int RunningJobs func(childComplexity int) int ShortJobs func(childComplexity int) int + TotalAccHours func(childComplexity int) int TotalCoreHours func(childComplexity int) int TotalJobs func(childComplexity int) int TotalNodeHours func(childComplexity int) int @@ -747,6 +748,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.JobsStatistics.ShortJobs(childComplexity), true + case "JobsStatistics.totalAccHours": + if e.complexity.JobsStatistics.TotalAccHours == nil { + break + } + + return e.complexity.JobsStatistics.TotalAccHours(childComplexity), true + case "JobsStatistics.totalCoreHours": if e.complexity.JobsStatistics.TotalCoreHours == nil { break @@ -1786,6 +1794,7 @@ type JobsStatistics { totalWalltime: Int! # Sum of the duration of all matched jobs in hours totalNodeHours: Int! # Sum of the node hours of all matched jobs totalCoreHours: Int! # Sum of the core hours of all matched jobs + totalAccHours: Int! # Sum of the gpu hours of all matched jobs histDuration: [HistoPoint!]! # value: hour, count: number of jobs with a rounded duration of value histNumNodes: [HistoPoint!]! # value: number of nodes, count: number of jobs with that number of nodes } @@ -5122,6 +5131,50 @@ func (ec *executionContext) fieldContext_JobsStatistics_totalCoreHours(ctx conte return fc, nil } +func (ec *executionContext) _JobsStatistics_totalAccHours(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_JobsStatistics_totalAccHours(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.TotalAccHours, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(int) + fc.Result = res + return ec.marshalNInt2int(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_JobsStatistics_totalAccHours(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "JobsStatistics", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Int does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _JobsStatistics_histDuration(ctx context.Context, field graphql.CollectedField, obj *model.JobsStatistics) (ret graphql.Marshaler) { fc, err := ec.fieldContext_JobsStatistics_histDuration(ctx, field) if err != nil { @@ -6983,6 +7036,8 @@ func (ec *executionContext) fieldContext_Query_jobsStatistics(ctx context.Contex return ec.fieldContext_JobsStatistics_totalNodeHours(ctx, field) case "totalCoreHours": return ec.fieldContext_JobsStatistics_totalCoreHours(ctx, field) + case "totalAccHours": + return ec.fieldContext_JobsStatistics_totalAccHours(ctx, field) case "histDuration": return ec.fieldContext_JobsStatistics_histDuration(ctx, field) case "histNumNodes": @@ -12207,6 +12262,13 @@ func (ec *executionContext) _JobsStatistics(ctx context.Context, sel ast.Selecti out.Values[i] = ec._JobsStatistics_totalCoreHours(ctx, field, obj) + if out.Values[i] == graphql.Null { + invalids++ + } + case "totalAccHours": + + out.Values[i] = ec._JobsStatistics_totalAccHours(ctx, field, obj) + if out.Values[i] == graphql.Null { invalids++ } diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index 4538098..90a0be2 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -95,6 +95,7 @@ type JobsStatistics struct { TotalWalltime int `json:"totalWalltime"` TotalNodeHours int `json:"totalNodeHours"` TotalCoreHours int `json:"totalCoreHours"` + TotalAccHours int `json:"totalAccHours"` HistDuration []*HistoPoint `json:"histDuration"` HistNumNodes []*HistoPoint `json:"histNumNodes"` } diff --git a/internal/repository/stats.go b/internal/repository/stats.go index a54c587..dcfa49f 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -61,18 +61,20 @@ func (r *JobRepository) buildStatsQuery( castType := r.getCastType() if col != "" { - // Scan columns: id, totalJobs, totalWalltime, totalNodeHours, totalCoreHours + // Scan columns: id, totalJobs, totalWalltime, totalNodeHours, totalCoreHours, totalAccHours query = sq.Select(col, "COUNT(job.id)", fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType), fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s)", castType), fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s)", castType), + fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_acc) / 3600) as %s)", castType), ).From("job").GroupBy(col) } else { - // Scan columns: totalJobs, totalWalltime, totalNodeHours, totalCoreHours + // Scan columns: totalJobs, totalWalltime, totalNodeHours, totalCoreHours, totalAccHours query = sq.Select("COUNT(job.id)", fmt.Sprintf("CAST(ROUND(SUM(job.duration) / 3600) as %s)", castType), fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes) / 3600) as %s)", castType), fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_hwthreads) / 3600) as %s)", castType), + fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_acc) / 3600) as %s)", castType), ).From("job") } @@ -131,27 +133,40 @@ func (r *JobRepository) JobsStatsGrouped( for rows.Next() { var id sql.NullString - var jobs, walltime, nodeHours, coreHours sql.NullInt64 - if err := rows.Scan(&id, &jobs, &walltime, &nodeHours, &coreHours); err != nil { + var jobs, walltime, nodeHours, coreHours, accHours sql.NullInt64 + if err := rows.Scan(&id, &jobs, &walltime, &nodeHours, &coreHours, &accHours); err != nil { log.Warn("Error while scanning rows") return nil, err } if id.Valid { + var totalCoreHours, totalAccHours int + + if coreHours.Valid { + totalCoreHours = int(coreHours.Int64) + } + if accHours.Valid { + totalAccHours = int(accHours.Int64) + } + if col == "job.user" { name := r.getUserName(ctx, id.String) stats = append(stats, &model.JobsStatistics{ - ID: id.String, - Name: name, - TotalJobs: int(jobs.Int64), - TotalWalltime: int(walltime.Int64)}) + ID: id.String, + Name: name, + TotalJobs: int(jobs.Int64), + TotalWalltime: int(walltime.Int64), + TotalCoreHours: totalCoreHours, + TotalAccHours: totalAccHours}) } else { stats = append(stats, &model.JobsStatistics{ - ID: id.String, - TotalJobs: int(jobs.Int64), - TotalWalltime: int(walltime.Int64)}) + ID: id.String, + TotalJobs: int(jobs.Int64), + TotalWalltime: int(walltime.Int64), + TotalCoreHours: totalCoreHours, + TotalAccHours: totalAccHours}) } } } @@ -174,17 +189,27 @@ func (r *JobRepository) JobsStats( row := query.RunWith(r.DB).QueryRow() stats := make([]*model.JobsStatistics, 0, 1) - var jobs, walltime, nodeHours, coreHours sql.NullInt64 - if err := row.Scan(&jobs, &walltime, &nodeHours, &coreHours); err != nil { + var jobs, walltime, nodeHours, coreHours, accHours sql.NullInt64 + if err := row.Scan(&jobs, &walltime, &nodeHours, &coreHours, &accHours); err != nil { log.Warn("Error while scanning rows") return nil, err } if jobs.Valid { + var totalCoreHours, totalAccHours int + + if coreHours.Valid { + totalCoreHours = int(coreHours.Int64) + } + if accHours.Valid { + totalAccHours = int(accHours.Int64) + } stats = append(stats, &model.JobsStatistics{ - TotalJobs: int(jobs.Int64), - TotalWalltime: int(walltime.Int64)}) + TotalJobs: int(jobs.Int64), + TotalWalltime: int(walltime.Int64), + TotalCoreHours: totalCoreHours, + TotalAccHours: totalAccHours}) } log.Infof("Timer JobStatistics %s", time.Since(start)) diff --git a/internal/repository/stats_test.go b/internal/repository/stats_test.go index 6ed485b..b1a815e 100644 --- a/internal/repository/stats_test.go +++ b/internal/repository/stats_test.go @@ -11,14 +11,11 @@ import ( func TestBuildJobStatsQuery(t *testing.T) { r := setup(t) - q := r.buildJobsStatsQuery(nil, "USER") + q := r.buildStatsQuery(nil, "USER") sql, _, err := q.ToSql() noErr(t, err) fmt.Printf("SQL: %s\n", sql) - if 1 != 5 { - t.Errorf("wrong summary for diagnostic 3\ngot: %d \nwant: 1366", 5) - } } From 9f42f5b28f8fe2466a71bc5c4537207639a4e876 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 9 Jun 2023 12:28:24 +0200 Subject: [PATCH 07/12] Add TotalAccHours to project and user lists --- web/frontend/src/List.root.svelte | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/web/frontend/src/List.root.svelte b/web/frontend/src/List.root.svelte index b219f35..814a7df 100644 --- a/web/frontend/src/List.root.svelte +++ b/web/frontend/src/List.root.svelte @@ -45,6 +45,7 @@ totalJobs totalWalltime totalCoreHours + totalAccHours } }`, variables: { jobFilters } @@ -166,6 +167,18 @@ + + Total Accelerator Hours + + @@ -205,6 +218,7 @@ {row.totalJobs} {row.totalWalltime} {row.totalCoreHours} + {row.totalAccHours} {:else} From 3d75f0ef3c6c568a32a42715ffb9c34eee96bd41 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 9 Jun 2023 12:42:01 +0200 Subject: [PATCH 08/12] Fix missed variable name change --- web/frontend/src/Analysis.root.svelte | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/frontend/src/Analysis.root.svelte b/web/frontend/src/Analysis.root.svelte index b0dd704..f632162 100644 --- a/web/frontend/src/Analysis.root.svelte +++ b/web/frontend/src/Analysis.root.svelte @@ -65,7 +65,7 @@ histNumNodes { count, value } } - topUsers: jobsCount(filter: $filters, groupBy: USER, weight: NODE_HOURS, limit: 5) { name, count } + topUsers: jobsCount(filter: $jobFilters, groupBy: USER, weight: NODE_HOURS, limit: 5) { name, count } } `, variables: { jobFilters } From edb1b47281481e8e0efd697622e310ca16edd366 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 9 Jun 2023 13:15:25 +0200 Subject: [PATCH 09/12] Add AddJobCount() for analysis view --- internal/graph/schema.resolvers.go | 7 ++++- internal/repository/stats.go | 43 ++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index de81bf7..1828ee4 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -284,7 +284,12 @@ func (r *queryResolver) JobsStatistics(ctx context.Context, filter []*model.JobF stats, err = r.Repo.AddJobCountGrouped(ctx, filter, groupBy, stats, "running") } } else { - return nil, errors.New("Job counts only implemented with groupBy argument") + if requireField(ctx, "shortJobs") { + stats, err = r.Repo.AddJobCount(ctx, filter, stats, "short") + } + if requireField(ctx, "RunningJobs") { + stats, err = r.Repo.AddJobCount(ctx, filter, stats, "running") + } } if err != nil { diff --git a/internal/repository/stats.go b/internal/repository/stats.go index dcfa49f..1065417 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -304,6 +304,49 @@ func (r *JobRepository) AddJobCountGrouped( return stats, nil } +func (r *JobRepository) AddJobCount( + ctx context.Context, + filter []*model.JobFilter, + stats []*model.JobsStatistics, + kind string) ([]*model.JobsStatistics, error) { + + start := time.Now() + query := r.buildCountQuery(filter, kind, "") + query, err := SecurityCheck(ctx, query) + if err != nil { + return nil, err + } + rows, err := query.RunWith(r.DB).Query() + if err != nil { + log.Warn("Error while querying DB for job statistics") + return nil, err + } + + counts := make(map[string]int) + + for rows.Next() { + var cnt sql.NullInt64 + if err := rows.Scan(&cnt); err != nil { + log.Warn("Error while scanning rows") + return nil, err + } + } + + switch kind { + case "running": + for _, s := range stats { + s.RunningJobs = counts[s.ID] + } + case "short": + for _, s := range stats { + s.ShortJobs = counts[s.ID] + } + } + + log.Infof("Timer JobStatistics %s", time.Since(start)) + return stats, nil +} + func (r *JobRepository) AddHistograms( ctx context.Context, filter []*model.JobFilter, From 6e5afd119297cac2cb658a5ee5a665807c4daaf9 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 9 Jun 2023 13:30:55 +0200 Subject: [PATCH 10/12] Add switch for small histograms - adapts distance of labels to x axis --- web/frontend/src/Analysis.root.svelte | 2 +- web/frontend/src/plots/Histogram.svelte | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/web/frontend/src/Analysis.root.svelte b/web/frontend/src/Analysis.root.svelte index f632162..5acc970 100644 --- a/web/frontend/src/Analysis.root.svelte +++ b/web/frontend/src/Analysis.root.svelte @@ -160,7 +160,7 @@ {#key $statsQuery.data.topUsers}

Top Users (by node hours)

b.count - a.count).map(({ count }, idx) => ({ count, value: idx }))} label={(x) => x < $statsQuery.data.topUsers.length ? $statsQuery.data.topUsers[Math.floor(x)].name : 'No Users'} ylabel="Node Hours [h]"/> diff --git a/web/frontend/src/plots/Histogram.svelte b/web/frontend/src/plots/Histogram.svelte index aebc8f3..eaaf49c 100644 --- a/web/frontend/src/plots/Histogram.svelte +++ b/web/frontend/src/plots/Histogram.svelte @@ -24,6 +24,7 @@ export let ylabel = '' export let min = null export let max = null + export let small = false export let label = formatNumber const fontSize = 12 @@ -108,7 +109,7 @@ } else { const stepsizeX = getStepSize(maxValue, w, 120) for (let x = 0; x <= maxValue; x += stepsizeX) { - ctx.fillText(label(x), getCanvasX(x), height - paddingBottom - Math.floor(labelOffset / 2)) + ctx.fillText(label(x), getCanvasX(x), height - paddingBottom - Math.floor(labelOffset / (small ? 8 : 2))) } } From 71db1f8cdd7622a572fc9ad7611bd61e71855a68 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 9 Jun 2023 13:55:09 +0200 Subject: [PATCH 11/12] Remove hardcoded time label --- web/frontend/src/Analysis.root.svelte | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/frontend/src/Analysis.root.svelte b/web/frontend/src/Analysis.root.svelte index 5acc970..42100cf 100644 --- a/web/frontend/src/Analysis.root.svelte +++ b/web/frontend/src/Analysis.root.svelte @@ -143,7 +143,7 @@ {$statsQuery.data.stats[0].totalJobs} - Short Jobs (< 2m) + Short Jobs {$statsQuery.data.stats[0].shortJobs} From 918f1993f205e676b955bf6a4a84f43ab15fc28b Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 9 Jun 2023 15:02:22 +0200 Subject: [PATCH 12/12] Add loglevel access. Fix timer labels. --- internal/repository/dbConnection.go | 12 +++++++----- internal/repository/stats.go | 12 +++++++----- pkg/log/log.go | 8 ++++++++ 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/internal/repository/dbConnection.go b/internal/repository/dbConnection.go index 8082bc4..38a258a 100644 --- a/internal/repository/dbConnection.go +++ b/internal/repository/dbConnection.go @@ -6,10 +6,10 @@ package repository import ( "database/sql" - "log" "sync" "time" + "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/jmoiron/sqlx" "github.com/mattn/go-sqlite3" "github.com/qustavo/sqlhooks/v2" @@ -48,15 +48,17 @@ func Connect(driver string, db string) { switch driver { case "sqlite3": - sql.Register("sqlite3WithHooks", sqlhooks.Wrap(&sqlite3.SQLiteDriver{}, &Hooks{})) - // - Set WAL mode (not strictly necessary each time because it's persisted in the database, but good for first run) // - Set busy timeout, so concurrent writers wait on each other instead of erroring immediately // - Enable foreign key checks opts.URL += "?_journal=WAL&_timeout=5000&_fk=true" - dbHandle, err = sqlx.Open("sqlite3WithHooks", opts.URL) - // dbHandle, err = sqlx.Open("sqlite3", opts.URL) + if log.Loglevel() == "debug" { + sql.Register("sqlite3WithHooks", sqlhooks.Wrap(&sqlite3.SQLiteDriver{}, &Hooks{})) + dbHandle, err = sqlx.Open("sqlite3WithHooks", opts.URL) + } else { + dbHandle, err = sqlx.Open("sqlite3", opts.URL) + } if err != nil { log.Fatal(err) } diff --git a/internal/repository/stats.go b/internal/repository/stats.go index 1065417..d96c20a 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -171,7 +171,7 @@ func (r *JobRepository) JobsStatsGrouped( } } - log.Infof("Timer JobStatistics %s", time.Since(start)) + log.Infof("Timer JobsStatsGrouped %s", time.Since(start)) return stats, nil } @@ -212,7 +212,7 @@ func (r *JobRepository) JobsStats( TotalAccHours: totalAccHours}) } - log.Infof("Timer JobStatistics %s", time.Since(start)) + log.Infof("Timer JobStats %s", time.Since(start)) return stats, nil } @@ -251,7 +251,7 @@ func (r *JobRepository) JobCountGrouped( } } - log.Infof("Timer JobStatistics %s", time.Since(start)) + log.Infof("Timer JobCountGrouped %s", time.Since(start)) return stats, nil } @@ -300,7 +300,7 @@ func (r *JobRepository) AddJobCountGrouped( } } - log.Infof("Timer JobStatistics %s", time.Since(start)) + log.Infof("Timer AddJobCountGrouped %s", time.Since(start)) return stats, nil } @@ -343,7 +343,7 @@ func (r *JobRepository) AddJobCount( } } - log.Infof("Timer JobStatistics %s", time.Since(start)) + log.Infof("Timer JobJobCount %s", time.Since(start)) return stats, nil } @@ -351,6 +351,7 @@ func (r *JobRepository) AddHistograms( ctx context.Context, filter []*model.JobFilter, stat *model.JobsStatistics) (*model.JobsStatistics, error) { + start := time.Now() castType := r.getCastType() var err error @@ -367,6 +368,7 @@ func (r *JobRepository) AddHistograms( return nil, err } + log.Infof("Timer AddHistograms %s", time.Since(start)) return stat, nil } diff --git a/pkg/log/log.go b/pkg/log/log.go index f514df9..7e89753 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -41,6 +41,8 @@ var ( CritLog *log.Logger = log.New(CritWriter, CritPrefix, log.LstdFlags|log.Llongfile) ) +var loglevel string = "info" + /* CONFIG */ func Init(lvl string, logdate bool) { @@ -78,6 +80,8 @@ func Init(lvl string, logdate bool) { ErrLog = log.New(ErrWriter, ErrPrefix, log.LstdFlags|log.Llongfile) CritLog = log.New(CritWriter, CritPrefix, log.LstdFlags|log.Llongfile) } + + loglevel = lvl } /* PRINT */ @@ -170,6 +174,10 @@ func Fatalf(format string, v ...interface{}) { os.Exit(1) } +func Loglevel() string { + return loglevel +} + /* SPECIAL */ // func Finfof(w io.Writer, format string, v ...interface{}) {