From 2651b96499f32e76c509e95697927effdba7f58a Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Mon, 14 Mar 2022 09:08:02 +0100 Subject: [PATCH 1/7] Add subcluster and walltime to Job types --- graph/generated/generated.go | 98 ++++++++++++++++++++++++++++++++++++ graph/schema.graphqls | 2 + init-db.go | 2 + repository/import.go | 8 +-- repository/job.go | 27 ++++++++-- schema/job.go | 3 +- 6 files changed, 131 insertions(+), 9 deletions(-) diff --git a/graph/generated/generated.go b/graph/generated/generated.go index 204e623..3ed5804 100644 --- a/graph/generated/generated.go +++ b/graph/generated/generated.go @@ -98,8 +98,10 @@ type ComplexityRoot struct { SMT func(childComplexity int) int StartTime func(childComplexity int) int State func(childComplexity int) int + SubCluster func(childComplexity int) int Tags func(childComplexity int) int User func(childComplexity int) int + Walltime func(childComplexity int) int } JobMetric struct { @@ -503,6 +505,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Job.State(childComplexity), true + case "Job.subCluster": + if e.complexity.Job.SubCluster == nil { + break + } + + return e.complexity.Job.SubCluster(childComplexity), true + case "Job.tags": if e.complexity.Job.Tags == nil { break @@ -517,6 +526,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Job.User(childComplexity), true + case "Job.walltime": + if e.complexity.Job.Walltime == nil { + break + } + + return e.complexity.Job.Walltime(childComplexity), true + case "JobMetric.scope": if e.complexity.JobMetric.Scope == nil { break @@ -1212,8 +1228,10 @@ type Job { user: String! project: String! cluster: String! + subCluster: String! startTime: Time! duration: Int! + walltime: Int! numNodes: Int! numHWThreads: Int! numAcc: Int! @@ -2648,6 +2666,41 @@ func (ec *executionContext) _Job_cluster(ctx context.Context, field graphql.Coll return ec.marshalNString2string(ctx, field.Selections, res) } +func (ec *executionContext) _Job_subCluster(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + fc := &graphql.FieldContext{ + Object: "Job", + Field: field, + Args: nil, + IsMethod: false, + IsResolver: false, + } + + ctx = graphql.WithFieldContext(ctx, fc) + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.SubCluster, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + func (ec *executionContext) _Job_startTime(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) { defer func() { if r := recover(); r != nil { @@ -2718,6 +2771,41 @@ func (ec *executionContext) _Job_duration(ctx context.Context, field graphql.Col return ec.marshalNInt2int32(ctx, field.Selections, res) } +func (ec *executionContext) _Job_walltime(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + fc := &graphql.FieldContext{ + Object: "Job", + Field: field, + Args: nil, + IsMethod: false, + IsResolver: false, + } + + ctx = graphql.WithFieldContext(ctx, fc) + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Walltime, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(int64) + fc.Result = res + return ec.marshalNInt2int64(ctx, field.Selections, res) +} + func (ec *executionContext) _Job_numNodes(ctx context.Context, field graphql.CollectedField, obj *schema.Job) (ret graphql.Marshaler) { defer func() { if r := recover(); r != nil { @@ -7695,6 +7783,11 @@ func (ec *executionContext) _Job(ctx context.Context, sel ast.SelectionSet, obj if out.Values[i] == graphql.Null { atomic.AddUint32(&invalids, 1) } + case "subCluster": + out.Values[i] = ec._Job_subCluster(ctx, field, obj) + if out.Values[i] == graphql.Null { + atomic.AddUint32(&invalids, 1) + } case "startTime": out.Values[i] = ec._Job_startTime(ctx, field, obj) if out.Values[i] == graphql.Null { @@ -7705,6 +7798,11 @@ func (ec *executionContext) _Job(ctx context.Context, sel ast.SelectionSet, obj if out.Values[i] == graphql.Null { atomic.AddUint32(&invalids, 1) } + case "walltime": + out.Values[i] = ec._Job_walltime(ctx, field, obj) + if out.Values[i] == graphql.Null { + atomic.AddUint32(&invalids, 1) + } case "numNodes": out.Values[i] = ec._Job_numNodes(ctx, field, obj) if out.Values[i] == graphql.Null { diff --git a/graph/schema.graphqls b/graph/schema.graphqls index 79553d4..0b85a34 100644 --- a/graph/schema.graphqls +++ b/graph/schema.graphqls @@ -11,8 +11,10 @@ type Job { user: String! project: String! cluster: String! + subCluster: String! startTime: Time! duration: Int! + walltime: Int! numNodes: Int! numHWThreads: Int! numAcc: Int! diff --git a/init-db.go b/init-db.go index b9fd9f7..0346080 100644 --- a/init-db.go +++ b/init-db.go @@ -25,6 +25,7 @@ const JOBS_DB_SCHEMA string = ` id INTEGER PRIMARY KEY /*!40101 AUTO_INCREMENT */, job_id BIGINT NOT NULL, cluster VARCHAR(255) NOT NULL, + subcluster VARCHAR(255) NOT NULL, start_time BIGINT NOT NULL, -- Unix timestamp user VARCHAR(255) NOT NULL, @@ -32,6 +33,7 @@ const JOBS_DB_SCHEMA string = ` ` + "`partition`" + ` VARCHAR(255) NOT NULL, -- partition is a keyword in mysql -.- array_job_id BIGINT NOT NULL, duration INT, + walltime INT, job_state VARCHAR(255) NOT NULL CHECK(job_state IN ('running', 'completed', 'failed', 'cancelled', 'stopped', 'timeout', 'preempted', 'out_of_memory')), meta_data TEXT, -- JSON resources TEXT NOT NULL, -- JSON diff --git a/repository/import.go b/repository/import.go index ff996a4..94fba51 100644 --- a/repository/import.go +++ b/repository/import.go @@ -16,12 +16,12 @@ import ( ) const NamedJobInsert string = `INSERT INTO job ( - job_id, user, project, cluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc, - exclusive, monitoring_status, smt, job_state, start_time, duration, resources, meta_data, + job_id, user, project, cluster, subcluster, ` + "`partition`" + `, array_job_id, num_nodes, num_hwthreads, num_acc, + exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data, mem_used_max, flops_any_avg, mem_bw_avg, load_avg, net_bw_avg, net_data_vol_total, file_bw_avg, file_data_vol_total ) VALUES ( - :job_id, :user, :project, :cluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, - :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :resources, :meta_data, + :job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, + :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data, :mem_used_max, :flops_any_avg, :mem_bw_avg, :load_avg, :net_bw_avg, :net_data_vol_total, :file_bw_avg, :file_data_vol_total );` diff --git a/repository/job.go b/repository/job.go index 7f3f9ee..732ef6f 100644 --- a/repository/job.go +++ b/repository/job.go @@ -13,6 +13,7 @@ import ( "github.com/ClusterCockpit/cc-backend/graph/model" "github.com/ClusterCockpit/cc-backend/schema" sq "github.com/Masterminds/squirrel" + "github.com/iamlouk/lrucache" "github.com/jmoiron/sqlx" ) @@ -20,10 +21,12 @@ type JobRepository struct { DB *sqlx.DB stmtCache *sq.StmtCache + cache *lrucache.Cache } func (r *JobRepository) Init() error { r.stmtCache = sq.NewStmtCache(r.DB) + r.cache = lrucache.New(100) return nil } @@ -120,11 +123,11 @@ func (r *JobRepository) Start(job *schema.JobMeta) (id int64, err error) { } res, err := r.DB.NamedExec(`INSERT INTO job ( - job_id, user, project, cluster, `+"`partition`"+`, array_job_id, num_nodes, num_hwthreads, num_acc, - exclusive, monitoring_status, smt, job_state, start_time, duration, resources, meta_data + job_id, user, project, cluster, subcluster, `+"`partition`"+`, array_job_id, num_nodes, num_hwthreads, num_acc, + exclusive, monitoring_status, smt, job_state, start_time, duration, walltime, resources, meta_data ) VALUES ( - :job_id, :user, :project, :cluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, - :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :resources, :meta_data + :job_id, :user, :project, :cluster, :subcluster, :partition, :array_job_id, :num_nodes, :num_hwthreads, :num_acc, + :exclusive, :monitoring_status, :smt, :job_state, :start_time, :duration, :walltime, :resources, :meta_data );`, job) if err != nil { return -1, err @@ -260,3 +263,19 @@ func (r *JobRepository) FindJobOrUser(ctx context.Context, searchterm string) (j return 0, "", ErrNotFound } + +func (r *JobRepository) Partitions(cluster string) ([]string, error) { + var err error + partitions := r.cache.Get("partitions:"+cluster, func() (interface{}, time.Duration, int) { + parts := []string{} + if err = r.DB.Select(&parts, `SELECT DISTINCT job.partition FROM job WHERE job.cluster = ?;`, cluster); err != nil { + return nil, 0, 1000 + } + + return parts, 1 * time.Hour, 1 + }) + if err != nil { + return nil, err + } + return partitions.([]string), nil +} diff --git a/schema/job.go b/schema/job.go index e6e9b25..db7c707 100644 --- a/schema/job.go +++ b/schema/job.go @@ -14,6 +14,7 @@ type BaseJob struct { User string `json:"user" db:"user"` Project string `json:"project" db:"project"` Cluster string `json:"cluster" db:"cluster"` + SubCluster string `json:"subCluster" db:"subcluster"` Partition string `json:"partition" db:"partition"` ArrayJobId int32 `json:"arrayJobId" db:"array_job_id"` NumNodes int32 `json:"numNodes" db:"num_nodes"` @@ -24,6 +25,7 @@ type BaseJob struct { SMT int32 `json:"smt" db:"smt"` State JobState `json:"jobState" db:"job_state"` Duration int32 `json:"duration" db:"duration"` + Walltime int64 `json:"walltime" db:"walltime"` Tags []*Tag `json:"tags"` RawResources []byte `json:"-" db:"resources"` Resources []*Resource `json:"resources"` @@ -54,7 +56,6 @@ type Job struct { type JobMeta struct { ID *int64 `json:"id,omitempty"` // never used in the job-archive, only available via REST-API BaseJob - Walltime int64 `json:"walltime"` // TODO: Missing in DB StartTime int64 `json:"startTime" db:"start_time"` Statistics map[string]JobStatistics `json:"statistics,omitempty"` } From 85ad6d9543e45b32185c4005d9947ca7effd2d60 Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Mon, 14 Mar 2022 10:18:56 +0100 Subject: [PATCH 2/7] subclusters instead of slurm partitions --- api_test.go | 16 +- config/config.go | 67 +- config/nodelist.go | 136 ++++ config/nodelist_test.go | 37 ++ graph/generated/generated.go | 1110 +++++++++++++++++---------------- graph/model/models.go | 2 +- graph/model/models_gen.go | 17 +- graph/schema.graphqls | 5 +- graph/schema.resolvers.go | 10 +- graph/stats.go | 6 +- metricdata/archive.go | 2 +- metricdata/cc-metric-store.go | 2 +- metricdata/metricdata.go | 4 +- repository/import.go | 7 +- repository/job.go | 8 +- 15 files changed, 868 insertions(+), 561 deletions(-) create mode 100644 config/nodelist.go create mode 100644 config/nodelist_test.go diff --git a/api_test.go b/api_test.go index 7fd3fa3..11a2454 100644 --- a/api_test.go +++ b/api_test.go @@ -30,9 +30,14 @@ func setup(t *testing.T) *api.RestApi { const testclusterJson = `{ "name": "testcluster", - "partitions": [ + "subClusters": [ { - "name": "default", + "name": "sc0", + "nodes": "host120,host121,host122" + }, + { + "name": "sc1", + "nodes": "host123,host124,host125", "processorType": "Intel Core i7-4770", "socketsPerNode": 1, "coresPerSocket": 4, @@ -141,7 +146,7 @@ func TestRestApi(t *testing.T) { Timestep: 60, Series: []schema.Series{ { - Hostname: "testhost", + Hostname: "host123", Statistics: &schema.MetricStatistics{Min: 0.1, Avg: 0.2, Max: 0.3}, Data: []schema.Float{0.1, 0.1, 0.1, 0.2, 0.2, 0.2, 0.3, 0.3, 0.3}, }, @@ -173,7 +178,7 @@ func TestRestApi(t *testing.T) { "tags": [{ "type": "testTagType", "name": "testTagName" }], "resources": [ { - "hostname": "testhost", + "hostname": "host123", "hwthreads": [0, 1, 2, 3, 4, 5, 6, 7] } ], @@ -211,6 +216,7 @@ func TestRestApi(t *testing.T) { job.User != "testuser" || job.Project != "testproj" || job.Cluster != "testcluster" || + job.SubCluster != "sc1" || job.Partition != "default" || job.ArrayJobId != 0 || job.NumNodes != 1 || @@ -219,7 +225,7 @@ func TestRestApi(t *testing.T) { job.Exclusive != 1 || job.MonitoringStatus != 1 || job.SMT != 1 || - !reflect.DeepEqual(job.Resources, []*schema.Resource{{Hostname: "testhost", HWThreads: []int{0, 1, 2, 3, 4, 5, 6, 7}}}) || + !reflect.DeepEqual(job.Resources, []*schema.Resource{{Hostname: "host123", HWThreads: []int{0, 1, 2, 3, 4, 5, 6, 7}}}) || job.StartTime.Unix() != 123456789 { t.Fatalf("unexpected job properties: %#v", job) } diff --git a/config/config.go b/config/config.go index 816ad8f..ef49a12 100644 --- a/config/config.go +++ b/config/config.go @@ -20,10 +20,14 @@ import ( var db *sqlx.DB var lookupConfigStmt *sqlx.Stmt + var lock sync.RWMutex var uiDefaults map[string]interface{} + var cache *lrucache.Cache = lrucache.New(1024) + var Clusters []*model.Cluster +var nodeLists map[string]map[string]NodeList func Init(usersdb *sqlx.DB, authEnabled bool, uiConfig map[string]interface{}, jobArchive string) error { db = usersdb @@ -34,6 +38,7 @@ func Init(usersdb *sqlx.DB, authEnabled bool, uiConfig map[string]interface{}, j } Clusters = []*model.Cluster{} + nodeLists = map[string]map[string]NodeList{} for _, de := range entries { raw, err := os.ReadFile(filepath.Join(jobArchive, de.Name(), "cluster.json")) if err != nil { @@ -53,8 +58,8 @@ func Init(usersdb *sqlx.DB, authEnabled bool, uiConfig map[string]interface{}, j return err } - if len(cluster.Name) == 0 || len(cluster.MetricConfig) == 0 || len(cluster.Partitions) == 0 { - return errors.New("cluster.name, cluster.metricConfig and cluster.Partitions should not be empty") + if len(cluster.Name) == 0 || len(cluster.MetricConfig) == 0 || len(cluster.SubClusters) == 0 { + return errors.New("cluster.name, cluster.metricConfig and cluster.SubClusters should not be empty") } for _, mc := range cluster.MetricConfig { @@ -83,6 +88,19 @@ func Init(usersdb *sqlx.DB, authEnabled bool, uiConfig map[string]interface{}, j } Clusters = append(Clusters, &cluster) + + nodeLists[cluster.Name] = make(map[string]NodeList) + for _, sc := range cluster.SubClusters { + if sc.Nodes == "" { + continue + } + + nl, err := ParseNodeList(sc.Nodes) + if err != nil { + return fmt.Errorf("in %s/cluster.json: %w", cluster.Name, err) + } + nodeLists[cluster.Name][sc.Name] = nl + } } if authEnabled { @@ -188,7 +206,7 @@ func UpdateConfig(key, value string, ctx context.Context) error { return nil } -func GetClusterConfig(cluster string) *model.Cluster { +func GetCluster(cluster string) *model.Cluster { for _, c := range Clusters { if c.Name == cluster { return c @@ -197,11 +215,11 @@ func GetClusterConfig(cluster string) *model.Cluster { return nil } -func GetPartition(cluster, partition string) *model.Partition { +func GetSubCluster(cluster, subcluster string) *model.SubCluster { for _, c := range Clusters { if c.Name == cluster { - for _, p := range c.Partitions { - if p.Name == partition { + for _, p := range c.SubClusters { + if p.Name == subcluster { return p } } @@ -222,3 +240,40 @@ func GetMetricConfig(cluster, metric string) *model.MetricConfig { } return nil } + +// AssignSubCluster sets the `job.subcluster` property of the job based +// on its cluster and resources. +func AssignSubCluster(job *schema.BaseJob) error { + cluster := GetCluster(job.Cluster) + if cluster == nil { + return fmt.Errorf("unkown cluster: %#v", job.Cluster) + } + + if job.SubCluster != "" { + for _, sc := range cluster.SubClusters { + if sc.Name == job.SubCluster { + return nil + } + } + return fmt.Errorf("already assigned subcluster %#v unkown (cluster: %#v)", job.SubCluster, job.Cluster) + } + + if len(job.Resources) == 0 { + return fmt.Errorf("job without any resources/hosts") + } + + host0 := job.Resources[0].Hostname + for sc, nl := range nodeLists[job.Cluster] { + if nl != nil && nl.Contains(host0) { + job.SubCluster = sc + return nil + } + } + + if cluster.SubClusters[0].Nodes == "" { + job.SubCluster = cluster.SubClusters[0].Name + return nil + } + + return fmt.Errorf("no subcluster found for cluster %#v and host %#v", job.Cluster, host0) +} diff --git a/config/nodelist.go b/config/nodelist.go new file mode 100644 index 0000000..800e1ba --- /dev/null +++ b/config/nodelist.go @@ -0,0 +1,136 @@ +package config + +import ( + "fmt" + "strconv" + "strings" + + "github.com/ClusterCockpit/cc-backend/log" +) + +type NLExprString string + +func (nle NLExprString) consume(input string) (next string, ok bool) { + str := string(nle) + if strings.HasPrefix(input, str) { + return strings.TrimPrefix(input, str), true + } + return "", false +} + +type NLExprIntRange struct { + start, end int64 + zeroPadded bool + digits int +} + +func (nle NLExprIntRange) consume(input string) (next string, ok bool) { + if !nle.zeroPadded || nle.digits < 1 { + log.Error("node list: only zero-padded ranges are allowed") + return "", false + } + + if len(input) < nle.digits { + return "", false + } + + numerals, rest := input[:nle.digits], input[nle.digits:] + for len(numerals) > 1 && numerals[0] == '0' { + numerals = numerals[1:] + } + + x, err := strconv.ParseInt(numerals, 10, 32) + if err != nil { + return "", false + } + + if nle.start <= x && x <= nle.end { + return rest, true + } + + return "", false +} + +type NodeList [][]interface { + consume(input string) (next string, ok bool) +} + +func (nl *NodeList) Contains(name string) bool { + var ok bool + for _, term := range *nl { + str := name + for _, expr := range term { + str, ok = expr.consume(str) + if !ok { + break + } + } + + if ok && str == "" { + return true + } + } + + return false +} + +func ParseNodeList(raw string) (NodeList, error) { + nl := NodeList{} + + isLetter := func(r byte) bool { return ('a' <= r && r <= 'z') || ('A' <= r && r <= 'Z') } + isDigit := func(r byte) bool { return '0' <= r && r <= '9' } + + for _, rawterm := range strings.Split(raw, ",") { + exprs := []interface { + consume(input string) (next string, ok bool) + }{} + for i := 0; i < len(rawterm); i++ { + c := rawterm[i] + if isLetter(c) || isDigit(c) { + j := i + for j < len(rawterm) && (isLetter(rawterm[j]) || isDigit(rawterm[j])) { + j++ + } + exprs = append(exprs, NLExprString(rawterm[i:j])) + i = j - 1 + } else if c == '[' { + end := strings.Index(rawterm[i:], "]") + if end == -1 { + return nil, fmt.Errorf("node list: unclosed '['") + } + + minus := strings.Index(rawterm[i:i+end], "-") + if minus == -1 { + return nil, fmt.Errorf("node list: no '-' found inside '[...]'") + } + + s1, s2 := rawterm[i+1:i+minus], rawterm[i+minus+1:i+end] + if len(s1) != len(s2) || len(s1) == 0 { + return nil, fmt.Errorf("node list: %#v and %#v are not of equal length or of length zero", s1, s2) + } + + x1, err := strconv.ParseInt(s1, 10, 32) + if err != nil { + return nil, fmt.Errorf("node list: %w", err) + } + x2, err := strconv.ParseInt(s2, 10, 32) + if err != nil { + return nil, fmt.Errorf("node list: %w", err) + } + + exprs = append(exprs, NLExprIntRange{ + start: x1, + end: x2, + digits: len(s1), + zeroPadded: true, + }) + i += end + } else { + return nil, fmt.Errorf("node list: invalid character: %#v", rune(c)) + } + } + nl = append(nl, exprs) + } + + return nl, nil +} diff --git a/config/nodelist_test.go b/config/nodelist_test.go new file mode 100644 index 0000000..6768d59 --- /dev/null +++ b/config/nodelist_test.go @@ -0,0 +1,37 @@ +package config + +import ( + "testing" +) + +func TestNodeList(t *testing.T) { + nl, err := ParseNodeList("hallo,wel123t,emmy[01-99],fritz[005-500],woody[100-200]") + if err != nil { + t.Fatal(err) + } + + // fmt.Printf("terms\n") + // for i, term := range nl.terms { + // fmt.Printf("term %d: %#v\n", i, term) + // } + + if nl.Contains("hello") || nl.Contains("woody") { + t.Fail() + } + + if nl.Contains("fritz1") || nl.Contains("fritz9") || nl.Contains("fritz004") || nl.Contains("woody201") { + t.Fail() + } + + if !nl.Contains("hallo") || !nl.Contains("wel123t") { + t.Fail() + } + + if !nl.Contains("emmy01") || !nl.Contains("emmy42") || !nl.Contains("emmy99") { + t.Fail() + } + + if !nl.Contains("woody100") || !nl.Contains("woody199") { + t.Fail() + } +} diff --git a/graph/generated/generated.go b/graph/generated/generated.go index 3ed5804..323229c 100644 --- a/graph/generated/generated.go +++ b/graph/generated/generated.go @@ -37,6 +37,7 @@ type Config struct { } type ResolverRoot interface { + Cluster() ClusterResolver Job() JobResolver Mutation() MutationResolver Query() QueryResolver @@ -56,7 +57,7 @@ type ComplexityRoot struct { FilterRanges func(childComplexity int) int MetricConfig func(childComplexity int) int Name func(childComplexity int) int - Partitions func(childComplexity int) int + SubClusters func(childComplexity int) int } Count struct { @@ -169,18 +170,6 @@ type ComplexityRoot struct { Metrics func(childComplexity int) int } - Partition struct { - CoresPerSocket func(childComplexity int) int - FlopRateScalar func(childComplexity int) int - FlopRateSimd func(childComplexity int) int - MemoryBandwidth func(childComplexity int) int - Name func(childComplexity int) int - ProcessorType func(childComplexity int) int - SocketsPerNode func(childComplexity int) int - ThreadsPerCore func(childComplexity int) int - Topology func(childComplexity int) int - } - Query struct { Clusters func(childComplexity int) int Job func(childComplexity int, id string) int @@ -214,6 +203,19 @@ type ComplexityRoot struct { Min func(childComplexity int) int } + SubCluster struct { + CoresPerSocket func(childComplexity int) int + FlopRateScalar func(childComplexity int) int + FlopRateSimd func(childComplexity int) int + MemoryBandwidth func(childComplexity int) int + Name func(childComplexity int) int + Nodes func(childComplexity int) int + ProcessorType func(childComplexity int) int + SocketsPerNode func(childComplexity int) int + ThreadsPerCore func(childComplexity int) int + Topology func(childComplexity int) int + } + Tag struct { ID func(childComplexity int) int Name func(childComplexity int) int @@ -235,6 +237,9 @@ type ComplexityRoot struct { } } +type ClusterResolver interface { + SubClusters(ctx context.Context, obj *model.Cluster) ([]*model.SubCluster, error) +} type JobResolver interface { MetaData(ctx context.Context, obj *schema.Job) (interface{}, error) Tags(ctx context.Context, obj *schema.Job) ([]*schema.Tag, error) @@ -316,12 +321,12 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Cluster.Name(childComplexity), true - case "Cluster.partitions": - if e.complexity.Cluster.Partitions == nil { + case "Cluster.subClusters": + if e.complexity.Cluster.SubClusters == nil { break } - return e.complexity.Cluster.Partitions(childComplexity), true + return e.complexity.Cluster.SubClusters(childComplexity), true case "Count.count": if e.complexity.Count.Count == nil { @@ -824,69 +829,6 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.NodeMetrics.Metrics(childComplexity), true - case "Partition.coresPerSocket": - if e.complexity.Partition.CoresPerSocket == nil { - break - } - - return e.complexity.Partition.CoresPerSocket(childComplexity), true - - case "Partition.flopRateScalar": - if e.complexity.Partition.FlopRateScalar == nil { - break - } - - return e.complexity.Partition.FlopRateScalar(childComplexity), true - - case "Partition.flopRateSimd": - if e.complexity.Partition.FlopRateSimd == nil { - break - } - - return e.complexity.Partition.FlopRateSimd(childComplexity), true - - case "Partition.memoryBandwidth": - if e.complexity.Partition.MemoryBandwidth == nil { - break - } - - return e.complexity.Partition.MemoryBandwidth(childComplexity), true - - case "Partition.name": - if e.complexity.Partition.Name == nil { - break - } - - return e.complexity.Partition.Name(childComplexity), true - - case "Partition.processorType": - if e.complexity.Partition.ProcessorType == nil { - break - } - - return e.complexity.Partition.ProcessorType(childComplexity), true - - case "Partition.socketsPerNode": - if e.complexity.Partition.SocketsPerNode == nil { - break - } - - return e.complexity.Partition.SocketsPerNode(childComplexity), true - - case "Partition.threadsPerCore": - if e.complexity.Partition.ThreadsPerCore == nil { - break - } - - return e.complexity.Partition.ThreadsPerCore(childComplexity), true - - case "Partition.topology": - if e.complexity.Partition.Topology == nil { - break - } - - return e.complexity.Partition.Topology(childComplexity), true - case "Query.clusters": if e.complexity.Query.Clusters == nil { break @@ -1074,6 +1016,76 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.StatsSeries.Min(childComplexity), true + case "SubCluster.coresPerSocket": + if e.complexity.SubCluster.CoresPerSocket == nil { + break + } + + return e.complexity.SubCluster.CoresPerSocket(childComplexity), true + + case "SubCluster.flopRateScalar": + if e.complexity.SubCluster.FlopRateScalar == nil { + break + } + + return e.complexity.SubCluster.FlopRateScalar(childComplexity), true + + case "SubCluster.flopRateSimd": + if e.complexity.SubCluster.FlopRateSimd == nil { + break + } + + return e.complexity.SubCluster.FlopRateSimd(childComplexity), true + + case "SubCluster.memoryBandwidth": + if e.complexity.SubCluster.MemoryBandwidth == nil { + break + } + + return e.complexity.SubCluster.MemoryBandwidth(childComplexity), true + + case "SubCluster.name": + if e.complexity.SubCluster.Name == nil { + break + } + + return e.complexity.SubCluster.Name(childComplexity), true + + case "SubCluster.nodes": + if e.complexity.SubCluster.Nodes == nil { + break + } + + return e.complexity.SubCluster.Nodes(childComplexity), true + + case "SubCluster.processorType": + if e.complexity.SubCluster.ProcessorType == nil { + break + } + + return e.complexity.SubCluster.ProcessorType(childComplexity), true + + case "SubCluster.socketsPerNode": + if e.complexity.SubCluster.SocketsPerNode == nil { + break + } + + return e.complexity.SubCluster.SocketsPerNode(childComplexity), true + + case "SubCluster.threadsPerCore": + if e.complexity.SubCluster.ThreadsPerCore == nil { + break + } + + return e.complexity.SubCluster.ThreadsPerCore(childComplexity), true + + case "SubCluster.topology": + if e.complexity.SubCluster.Topology == nil { + break + } + + return e.complexity.SubCluster.Topology(childComplexity), true + case "Tag.id": if e.complexity.Tag.ID == nil { break @@ -1250,11 +1262,12 @@ type Cluster { name: String! metricConfig: [MetricConfig!]! filterRanges: FilterRanges! - partitions: [Partition!]! + subClusters: [SubCluster!]! } -type Partition { +type SubCluster { name: String! + nodes: String! processorType: String! socketsPerNode: Int! coresPerSocket: Int! @@ -2141,7 +2154,7 @@ func (ec *executionContext) _Cluster_filterRanges(ctx context.Context, field gra return ec.marshalNFilterRanges2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐFilterRanges(ctx, field.Selections, res) } -func (ec *executionContext) _Cluster_partitions(ctx context.Context, field graphql.CollectedField, obj *model.Cluster) (ret graphql.Marshaler) { +func (ec *executionContext) _Cluster_subClusters(ctx context.Context, field graphql.CollectedField, obj *model.Cluster) (ret graphql.Marshaler) { defer func() { if r := recover(); r != nil { ec.Error(ctx, ec.Recover(ctx, r)) @@ -2152,14 +2165,14 @@ func (ec *executionContext) _Cluster_partitions(ctx context.Context, field graph Object: "Cluster", Field: field, Args: nil, - IsMethod: false, - IsResolver: false, + IsMethod: true, + IsResolver: true, } ctx = graphql.WithFieldContext(ctx, fc) resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return obj.Partitions, nil + return ec.resolvers.Cluster().SubClusters(rctx, obj) }) if err != nil { ec.Error(ctx, err) @@ -2171,9 +2184,9 @@ func (ec *executionContext) _Cluster_partitions(ctx context.Context, field graph } return graphql.Null } - res := resTmp.([]*model.Partition) + res := resTmp.([]*model.SubCluster) fc.Result = res - return ec.marshalNPartition2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐPartitionᚄ(ctx, field.Selections, res) + return ec.marshalNSubCluster2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐSubClusterᚄ(ctx, field.Selections, res) } func (ec *executionContext) _Count_name(ctx context.Context, field graphql.CollectedField, obj *model.Count) (ret graphql.Marshaler) { @@ -4570,321 +4583,6 @@ func (ec *executionContext) _NodeMetrics_metrics(ctx context.Context, field grap return ec.marshalNJobMetricWithName2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐJobMetricWithNameᚄ(ctx, field.Selections, res) } -func (ec *executionContext) _Partition_name(ctx context.Context, field graphql.CollectedField, obj *model.Partition) (ret graphql.Marshaler) { - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = graphql.Null - } - }() - fc := &graphql.FieldContext{ - Object: "Partition", - Field: field, - Args: nil, - IsMethod: false, - IsResolver: false, - } - - ctx = graphql.WithFieldContext(ctx, fc) - resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { - ctx = rctx // use context from middleware stack in children - return obj.Name, nil - }) - if err != nil { - ec.Error(ctx, err) - return graphql.Null - } - if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - res := resTmp.(string) - fc.Result = res - return ec.marshalNString2string(ctx, field.Selections, res) -} - -func (ec *executionContext) _Partition_processorType(ctx context.Context, field graphql.CollectedField, obj *model.Partition) (ret graphql.Marshaler) { - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = graphql.Null - } - }() - fc := &graphql.FieldContext{ - Object: "Partition", - Field: field, - Args: nil, - IsMethod: false, - IsResolver: false, - } - - ctx = graphql.WithFieldContext(ctx, fc) - resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { - ctx = rctx // use context from middleware stack in children - return obj.ProcessorType, nil - }) - if err != nil { - ec.Error(ctx, err) - return graphql.Null - } - if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - res := resTmp.(string) - fc.Result = res - return ec.marshalNString2string(ctx, field.Selections, res) -} - -func (ec *executionContext) _Partition_socketsPerNode(ctx context.Context, field graphql.CollectedField, obj *model.Partition) (ret graphql.Marshaler) { - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = graphql.Null - } - }() - fc := &graphql.FieldContext{ - Object: "Partition", - Field: field, - Args: nil, - IsMethod: false, - IsResolver: false, - } - - ctx = graphql.WithFieldContext(ctx, fc) - resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { - ctx = rctx // use context from middleware stack in children - return obj.SocketsPerNode, nil - }) - if err != nil { - ec.Error(ctx, err) - return graphql.Null - } - if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - res := resTmp.(int) - fc.Result = res - return ec.marshalNInt2int(ctx, field.Selections, res) -} - -func (ec *executionContext) _Partition_coresPerSocket(ctx context.Context, field graphql.CollectedField, obj *model.Partition) (ret graphql.Marshaler) { - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = graphql.Null - } - }() - fc := &graphql.FieldContext{ - Object: "Partition", - Field: field, - Args: nil, - IsMethod: false, - IsResolver: false, - } - - ctx = graphql.WithFieldContext(ctx, fc) - resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { - ctx = rctx // use context from middleware stack in children - return obj.CoresPerSocket, nil - }) - if err != nil { - ec.Error(ctx, err) - return graphql.Null - } - if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - res := resTmp.(int) - fc.Result = res - return ec.marshalNInt2int(ctx, field.Selections, res) -} - -func (ec *executionContext) _Partition_threadsPerCore(ctx context.Context, field graphql.CollectedField, obj *model.Partition) (ret graphql.Marshaler) { - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = graphql.Null - } - }() - fc := &graphql.FieldContext{ - Object: "Partition", - Field: field, - Args: nil, - IsMethod: false, - IsResolver: false, - } - - ctx = graphql.WithFieldContext(ctx, fc) - resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { - ctx = rctx // use context from middleware stack in children - return obj.ThreadsPerCore, nil - }) - if err != nil { - ec.Error(ctx, err) - return graphql.Null - } - if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - res := resTmp.(int) - fc.Result = res - return ec.marshalNInt2int(ctx, field.Selections, res) -} - -func (ec *executionContext) _Partition_flopRateScalar(ctx context.Context, field graphql.CollectedField, obj *model.Partition) (ret graphql.Marshaler) { - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = graphql.Null - } - }() - fc := &graphql.FieldContext{ - Object: "Partition", - Field: field, - Args: nil, - IsMethod: false, - IsResolver: false, - } - - ctx = graphql.WithFieldContext(ctx, fc) - resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { - ctx = rctx // use context from middleware stack in children - return obj.FlopRateScalar, nil - }) - if err != nil { - ec.Error(ctx, err) - return graphql.Null - } - if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - res := resTmp.(int) - fc.Result = res - return ec.marshalNInt2int(ctx, field.Selections, res) -} - -func (ec *executionContext) _Partition_flopRateSimd(ctx context.Context, field graphql.CollectedField, obj *model.Partition) (ret graphql.Marshaler) { - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = graphql.Null - } - }() - fc := &graphql.FieldContext{ - Object: "Partition", - Field: field, - Args: nil, - IsMethod: false, - IsResolver: false, - } - - ctx = graphql.WithFieldContext(ctx, fc) - resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { - ctx = rctx // use context from middleware stack in children - return obj.FlopRateSimd, nil - }) - if err != nil { - ec.Error(ctx, err) - return graphql.Null - } - if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - res := resTmp.(int) - fc.Result = res - return ec.marshalNInt2int(ctx, field.Selections, res) -} - -func (ec *executionContext) _Partition_memoryBandwidth(ctx context.Context, field graphql.CollectedField, obj *model.Partition) (ret graphql.Marshaler) { - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = graphql.Null - } - }() - fc := &graphql.FieldContext{ - Object: "Partition", - Field: field, - Args: nil, - IsMethod: false, - IsResolver: false, - } - - ctx = graphql.WithFieldContext(ctx, fc) - resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { - ctx = rctx // use context from middleware stack in children - return obj.MemoryBandwidth, nil - }) - if err != nil { - ec.Error(ctx, err) - return graphql.Null - } - if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - res := resTmp.(int) - fc.Result = res - return ec.marshalNInt2int(ctx, field.Selections, res) -} - -func (ec *executionContext) _Partition_topology(ctx context.Context, field graphql.CollectedField, obj *model.Partition) (ret graphql.Marshaler) { - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = graphql.Null - } - }() - fc := &graphql.FieldContext{ - Object: "Partition", - Field: field, - Args: nil, - IsMethod: false, - IsResolver: false, - } - - ctx = graphql.WithFieldContext(ctx, fc) - resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { - ctx = rctx // use context from middleware stack in children - return obj.Topology, nil - }) - if err != nil { - ec.Error(ctx, err) - return graphql.Null - } - if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - res := resTmp.(*model.Topology) - fc.Result = res - return ec.marshalNTopology2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐTopology(ctx, field.Selections, res) -} - func (ec *executionContext) _Query_clusters(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { defer func() { if r := recover(); r != nil { @@ -5729,6 +5427,356 @@ func (ec *executionContext) _StatsSeries_max(ctx context.Context, field graphql. return ec.marshalNNullableFloat2ᚕgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋschemaᚐFloatᚄ(ctx, field.Selections, res) } +func (ec *executionContext) _SubCluster_name(ctx context.Context, field graphql.CollectedField, obj *model.SubCluster) (ret graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + fc := &graphql.FieldContext{ + Object: "SubCluster", + Field: field, + Args: nil, + IsMethod: false, + IsResolver: false, + } + + ctx = graphql.WithFieldContext(ctx, fc) + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Name, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) _SubCluster_nodes(ctx context.Context, field graphql.CollectedField, obj *model.SubCluster) (ret graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + fc := &graphql.FieldContext{ + Object: "SubCluster", + Field: field, + Args: nil, + IsMethod: false, + IsResolver: false, + } + + ctx = graphql.WithFieldContext(ctx, fc) + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Nodes, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) _SubCluster_processorType(ctx context.Context, field graphql.CollectedField, obj *model.SubCluster) (ret graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + fc := &graphql.FieldContext{ + Object: "SubCluster", + Field: field, + Args: nil, + IsMethod: false, + IsResolver: false, + } + + ctx = graphql.WithFieldContext(ctx, fc) + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.ProcessorType, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) _SubCluster_socketsPerNode(ctx context.Context, field graphql.CollectedField, obj *model.SubCluster) (ret graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + fc := &graphql.FieldContext{ + Object: "SubCluster", + Field: field, + Args: nil, + IsMethod: false, + IsResolver: false, + } + + ctx = graphql.WithFieldContext(ctx, fc) + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.SocketsPerNode, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(int) + fc.Result = res + return ec.marshalNInt2int(ctx, field.Selections, res) +} + +func (ec *executionContext) _SubCluster_coresPerSocket(ctx context.Context, field graphql.CollectedField, obj *model.SubCluster) (ret graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + fc := &graphql.FieldContext{ + Object: "SubCluster", + Field: field, + Args: nil, + IsMethod: false, + IsResolver: false, + } + + ctx = graphql.WithFieldContext(ctx, fc) + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.CoresPerSocket, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(int) + fc.Result = res + return ec.marshalNInt2int(ctx, field.Selections, res) +} + +func (ec *executionContext) _SubCluster_threadsPerCore(ctx context.Context, field graphql.CollectedField, obj *model.SubCluster) (ret graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + fc := &graphql.FieldContext{ + Object: "SubCluster", + Field: field, + Args: nil, + IsMethod: false, + IsResolver: false, + } + + ctx = graphql.WithFieldContext(ctx, fc) + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.ThreadsPerCore, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(int) + fc.Result = res + return ec.marshalNInt2int(ctx, field.Selections, res) +} + +func (ec *executionContext) _SubCluster_flopRateScalar(ctx context.Context, field graphql.CollectedField, obj *model.SubCluster) (ret graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + fc := &graphql.FieldContext{ + Object: "SubCluster", + Field: field, + Args: nil, + IsMethod: false, + IsResolver: false, + } + + ctx = graphql.WithFieldContext(ctx, fc) + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.FlopRateScalar, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(int) + fc.Result = res + return ec.marshalNInt2int(ctx, field.Selections, res) +} + +func (ec *executionContext) _SubCluster_flopRateSimd(ctx context.Context, field graphql.CollectedField, obj *model.SubCluster) (ret graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + fc := &graphql.FieldContext{ + Object: "SubCluster", + Field: field, + Args: nil, + IsMethod: false, + IsResolver: false, + } + + ctx = graphql.WithFieldContext(ctx, fc) + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.FlopRateSimd, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(int) + fc.Result = res + return ec.marshalNInt2int(ctx, field.Selections, res) +} + +func (ec *executionContext) _SubCluster_memoryBandwidth(ctx context.Context, field graphql.CollectedField, obj *model.SubCluster) (ret graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + fc := &graphql.FieldContext{ + Object: "SubCluster", + Field: field, + Args: nil, + IsMethod: false, + IsResolver: false, + } + + ctx = graphql.WithFieldContext(ctx, fc) + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.MemoryBandwidth, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(int) + fc.Result = res + return ec.marshalNInt2int(ctx, field.Selections, res) +} + +func (ec *executionContext) _SubCluster_topology(ctx context.Context, field graphql.CollectedField, obj *model.SubCluster) (ret graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + fc := &graphql.FieldContext{ + Object: "SubCluster", + Field: field, + Args: nil, + IsMethod: false, + IsResolver: false, + } + + ctx = graphql.WithFieldContext(ctx, fc) + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Topology, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(*model.Topology) + fc.Result = res + return ec.marshalNTopology2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐTopology(ctx, field.Selections, res) +} + func (ec *executionContext) _Tag_id(ctx context.Context, field graphql.CollectedField, obj *schema.Tag) (ret graphql.Marshaler) { defer func() { if r := recover(); r != nil { @@ -7586,23 +7634,32 @@ func (ec *executionContext) _Cluster(ctx context.Context, sel ast.SelectionSet, case "name": out.Values[i] = ec._Cluster_name(ctx, field, obj) if out.Values[i] == graphql.Null { - invalids++ + atomic.AddUint32(&invalids, 1) } case "metricConfig": out.Values[i] = ec._Cluster_metricConfig(ctx, field, obj) if out.Values[i] == graphql.Null { - invalids++ + atomic.AddUint32(&invalids, 1) } case "filterRanges": out.Values[i] = ec._Cluster_filterRanges(ctx, field, obj) if out.Values[i] == graphql.Null { - invalids++ - } - case "partitions": - out.Values[i] = ec._Cluster_partitions(ctx, field, obj) - if out.Values[i] == graphql.Null { - invalids++ + atomic.AddUint32(&invalids, 1) } + case "subClusters": + field := field + out.Concurrently(i, func() (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Cluster_subClusters(ctx, field, obj) + if res == graphql.Null { + atomic.AddUint32(&invalids, 1) + } + return res + }) default: panic("unknown field " + strconv.Quote(field.Name)) } @@ -8263,73 +8320,6 @@ func (ec *executionContext) _NodeMetrics(ctx context.Context, sel ast.SelectionS return out } -var partitionImplementors = []string{"Partition"} - -func (ec *executionContext) _Partition(ctx context.Context, sel ast.SelectionSet, obj *model.Partition) graphql.Marshaler { - fields := graphql.CollectFields(ec.OperationContext, sel, partitionImplementors) - - out := graphql.NewFieldSet(fields) - var invalids uint32 - for i, field := range fields { - switch field.Name { - case "__typename": - out.Values[i] = graphql.MarshalString("Partition") - case "name": - out.Values[i] = ec._Partition_name(ctx, field, obj) - if out.Values[i] == graphql.Null { - invalids++ - } - case "processorType": - out.Values[i] = ec._Partition_processorType(ctx, field, obj) - if out.Values[i] == graphql.Null { - invalids++ - } - case "socketsPerNode": - out.Values[i] = ec._Partition_socketsPerNode(ctx, field, obj) - if out.Values[i] == graphql.Null { - invalids++ - } - case "coresPerSocket": - out.Values[i] = ec._Partition_coresPerSocket(ctx, field, obj) - if out.Values[i] == graphql.Null { - invalids++ - } - case "threadsPerCore": - out.Values[i] = ec._Partition_threadsPerCore(ctx, field, obj) - if out.Values[i] == graphql.Null { - invalids++ - } - case "flopRateScalar": - out.Values[i] = ec._Partition_flopRateScalar(ctx, field, obj) - if out.Values[i] == graphql.Null { - invalids++ - } - case "flopRateSimd": - out.Values[i] = ec._Partition_flopRateSimd(ctx, field, obj) - if out.Values[i] == graphql.Null { - invalids++ - } - case "memoryBandwidth": - out.Values[i] = ec._Partition_memoryBandwidth(ctx, field, obj) - if out.Values[i] == graphql.Null { - invalids++ - } - case "topology": - out.Values[i] = ec._Partition_topology(ctx, field, obj) - if out.Values[i] == graphql.Null { - invalids++ - } - default: - panic("unknown field " + strconv.Quote(field.Name)) - } - } - out.Dispatch() - if invalids > 0 { - return graphql.Null - } - return out -} - var queryImplementors = []string{"Query"} func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) graphql.Marshaler { @@ -8603,6 +8593,78 @@ func (ec *executionContext) _StatsSeries(ctx context.Context, sel ast.SelectionS return out } +var subClusterImplementors = []string{"SubCluster"} + +func (ec *executionContext) _SubCluster(ctx context.Context, sel ast.SelectionSet, obj *model.SubCluster) graphql.Marshaler { + fields := graphql.CollectFields(ec.OperationContext, sel, subClusterImplementors) + + out := graphql.NewFieldSet(fields) + var invalids uint32 + for i, field := range fields { + switch field.Name { + case "__typename": + out.Values[i] = graphql.MarshalString("SubCluster") + case "name": + out.Values[i] = ec._SubCluster_name(ctx, field, obj) + if out.Values[i] == graphql.Null { + invalids++ + } + case "nodes": + out.Values[i] = ec._SubCluster_nodes(ctx, field, obj) + if out.Values[i] == graphql.Null { + invalids++ + } + case "processorType": + out.Values[i] = ec._SubCluster_processorType(ctx, field, obj) + if out.Values[i] == graphql.Null { + invalids++ + } + case "socketsPerNode": + out.Values[i] = ec._SubCluster_socketsPerNode(ctx, field, obj) + if out.Values[i] == graphql.Null { + invalids++ + } + case "coresPerSocket": + out.Values[i] = ec._SubCluster_coresPerSocket(ctx, field, obj) + if out.Values[i] == graphql.Null { + invalids++ + } + case "threadsPerCore": + out.Values[i] = ec._SubCluster_threadsPerCore(ctx, field, obj) + if out.Values[i] == graphql.Null { + invalids++ + } + case "flopRateScalar": + out.Values[i] = ec._SubCluster_flopRateScalar(ctx, field, obj) + if out.Values[i] == graphql.Null { + invalids++ + } + case "flopRateSimd": + out.Values[i] = ec._SubCluster_flopRateSimd(ctx, field, obj) + if out.Values[i] == graphql.Null { + invalids++ + } + case "memoryBandwidth": + out.Values[i] = ec._SubCluster_memoryBandwidth(ctx, field, obj) + if out.Values[i] == graphql.Null { + invalids++ + } + case "topology": + out.Values[i] = ec._SubCluster_topology(ctx, field, obj) + if out.Values[i] == graphql.Null { + invalids++ + } + default: + panic("unknown field " + strconv.Quote(field.Name)) + } + } + out.Dispatch() + if invalids > 0 { + return graphql.Null + } + return out +} + var tagImplementors = []string{"Tag"} func (ec *executionContext) _Tag(ctx context.Context, sel ast.SelectionSet, obj *schema.Tag) graphql.Marshaler { @@ -9760,53 +9822,6 @@ func (ec *executionContext) marshalNNullableFloat2ᚕgithubᚗcomᚋClusterCockp return ret } -func (ec *executionContext) marshalNPartition2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐPartitionᚄ(ctx context.Context, sel ast.SelectionSet, v []*model.Partition) graphql.Marshaler { - ret := make(graphql.Array, len(v)) - var wg sync.WaitGroup - isLen1 := len(v) == 1 - if !isLen1 { - wg.Add(len(v)) - } - for i := range v { - i := i - fc := &graphql.FieldContext{ - Index: &i, - Result: &v[i], - } - ctx := graphql.WithFieldContext(ctx, fc) - f := func(i int) { - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = nil - } - }() - if !isLen1 { - defer wg.Done() - } - ret[i] = ec.marshalNPartition2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐPartition(ctx, sel, v[i]) - } - if isLen1 { - f(i) - } else { - go f(i) - } - - } - wg.Wait() - return ret -} - -func (ec *executionContext) marshalNPartition2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐPartition(ctx context.Context, sel ast.SelectionSet, v *model.Partition) graphql.Marshaler { - if v == nil { - if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - return ec._Partition(ctx, sel, v) -} - func (ec *executionContext) marshalNResource2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋschemaᚐResourceᚄ(ctx context.Context, sel ast.SelectionSet, v []*schema.Resource) graphql.Marshaler { ret := make(graphql.Array, len(v)) var wg sync.WaitGroup @@ -9913,6 +9928,53 @@ func (ec *executionContext) marshalNString2ᚕstringᚄ(ctx context.Context, sel return ret } +func (ec *executionContext) marshalNSubCluster2ᚕᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐSubClusterᚄ(ctx context.Context, sel ast.SelectionSet, v []*model.SubCluster) graphql.Marshaler { + ret := make(graphql.Array, len(v)) + var wg sync.WaitGroup + isLen1 := len(v) == 1 + if !isLen1 { + wg.Add(len(v)) + } + for i := range v { + i := i + fc := &graphql.FieldContext{ + Index: &i, + Result: &v[i], + } + ctx := graphql.WithFieldContext(ctx, fc) + f := func(i int) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = nil + } + }() + if !isLen1 { + defer wg.Done() + } + ret[i] = ec.marshalNSubCluster2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐSubCluster(ctx, sel, v[i]) + } + if isLen1 { + f(i) + } else { + go f(i) + } + + } + wg.Wait() + return ret +} + +func (ec *executionContext) marshalNSubCluster2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋgraphᚋmodelᚐSubCluster(ctx context.Context, sel ast.SelectionSet, v *model.SubCluster) graphql.Marshaler { + if v == nil { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + return ec._SubCluster(ctx, sel, v) +} + func (ec *executionContext) marshalNTag2githubᚗcomᚋClusterCockpitᚋccᚑbackendᚋschemaᚐTag(ctx context.Context, sel ast.SelectionSet, v schema.Tag) graphql.Marshaler { return ec._Tag(ctx, sel, &v) } diff --git a/graph/model/models.go b/graph/model/models.go index e67098e..7a2d936 100644 --- a/graph/model/models.go +++ b/graph/model/models.go @@ -6,7 +6,7 @@ type Cluster struct { Name string `json:"name"` MetricConfig []*MetricConfig `json:"metricConfig"` FilterRanges *FilterRanges `json:"filterRanges"` - Partitions []*Partition `json:"partitions"` + SubClusters []*SubCluster `json:"subClusters"` // NOT part of the API: MetricDataRepository *MetricDataRepository `json:"metricDataRepository"` diff --git a/graph/model/models_gen.go b/graph/model/models_gen.go index 174f679..95f58b0 100644 --- a/graph/model/models_gen.go +++ b/graph/model/models_gen.go @@ -122,8 +122,16 @@ type PageRequest struct { Page int `json:"page"` } -type Partition struct { +type StringInput struct { + Eq *string `json:"eq"` + Contains *string `json:"contains"` + StartsWith *string `json:"startsWith"` + EndsWith *string `json:"endsWith"` +} + +type SubCluster struct { Name string `json:"name"` + Nodes string `json:"nodes"` ProcessorType string `json:"processorType"` SocketsPerNode int `json:"socketsPerNode"` CoresPerSocket int `json:"coresPerSocket"` @@ -134,13 +142,6 @@ type Partition struct { Topology *Topology `json:"topology"` } -type StringInput struct { - Eq *string `json:"eq"` - Contains *string `json:"contains"` - StartsWith *string `json:"startsWith"` - EndsWith *string `json:"endsWith"` -} - type TimeRange struct { From *time.Time `json:"from"` To *time.Time `json:"to"` diff --git a/graph/schema.graphqls b/graph/schema.graphqls index 0b85a34..25d44a8 100644 --- a/graph/schema.graphqls +++ b/graph/schema.graphqls @@ -33,11 +33,12 @@ type Cluster { name: String! metricConfig: [MetricConfig!]! filterRanges: FilterRanges! - partitions: [Partition!]! + subClusters: [SubCluster!]! } -type Partition { +type SubCluster { name: String! + nodes: String! processorType: String! socketsPerNode: Int! coresPerSocket: Int! diff --git a/graph/schema.resolvers.go b/graph/schema.resolvers.go index 3fa95b0..8fec60c 100644 --- a/graph/schema.resolvers.go +++ b/graph/schema.resolvers.go @@ -18,6 +18,10 @@ import ( "github.com/ClusterCockpit/cc-backend/schema" ) +func (r *clusterResolver) SubClusters(ctx context.Context, obj *model.Cluster) ([]*model.SubCluster, error) { + panic(fmt.Errorf("not implemented")) +} + func (r *jobResolver) MetaData(ctx context.Context, obj *schema.Job) (interface{}, error) { return r.Repo.FetchMetadata(obj) } @@ -204,7 +208,7 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, partiti } if metrics == nil { - for _, mc := range config.GetClusterConfig(cluster).MetricConfig { + for _, mc := range config.GetCluster(cluster).MetricConfig { metrics = append(metrics, mc.Name) } } @@ -236,6 +240,9 @@ func (r *queryResolver) NodeMetrics(ctx context.Context, cluster string, partiti return nodeMetrics, nil } +// Cluster returns generated.ClusterResolver implementation. +func (r *Resolver) Cluster() generated.ClusterResolver { return &clusterResolver{r} } + // Job returns generated.JobResolver implementation. func (r *Resolver) Job() generated.JobResolver { return &jobResolver{r} } @@ -245,6 +252,7 @@ func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResol // Query returns generated.QueryResolver implementation. func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} } +type clusterResolver struct{ *Resolver } type jobResolver struct{ *Resolver } type mutationResolver struct{ *Resolver } type queryResolver struct{ *Resolver } diff --git a/graph/stats.go b/graph/stats.go index da21995..fb24bab 100644 --- a/graph/stats.go +++ b/graph/stats.go @@ -32,8 +32,8 @@ func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobF // `socketsPerNode` and `coresPerSocket` can differ from cluster to cluster, so we need to explicitly loop over those. for _, cluster := range config.Clusters { - for _, partition := range cluster.Partitions { - corehoursCol := fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as int)", partition.SocketsPerNode, partition.CoresPerSocket) + for _, subcluster := range cluster.SubClusters { + corehoursCol := fmt.Sprintf("CAST(ROUND(SUM(job.duration * job.num_nodes * %d * %d) / 3600) as int)", subcluster.SocketsPerNode, subcluster.CoresPerSocket) var query sq.SelectBuilder if groupBy == nil { query = sq.Select( @@ -54,7 +54,7 @@ func (r *queryResolver) jobsStatistics(ctx context.Context, filter []*model.JobF query = query. Where("job.cluster = ?", cluster.Name). - Where("job.partition = ?", partition.Name) + Where("job.subcluster = ?", subcluster.Name) query = repository.SecurityCheck(ctx, query) for _, f := range filter { diff --git a/metricdata/archive.go b/metricdata/archive.go index e2aff03..e3cae79 100644 --- a/metricdata/archive.go +++ b/metricdata/archive.go @@ -157,7 +157,7 @@ func GetStatistics(job *schema.Job) (map[string]schema.JobStatistics, error) { // Writes a running job to the job-archive func ArchiveJob(job *schema.Job, ctx context.Context) (*schema.JobMeta, error) { allMetrics := make([]string, 0) - metricConfigs := config.GetClusterConfig(job.Cluster).MetricConfig + metricConfigs := config.GetCluster(job.Cluster).MetricConfig for _, mc := range metricConfigs { allMetrics = append(allMetrics, mc.Name) } diff --git a/metricdata/cc-metric-store.go b/metricdata/cc-metric-store.go index 78d8750..c77d43d 100644 --- a/metricdata/cc-metric-store.go +++ b/metricdata/cc-metric-store.go @@ -227,7 +227,7 @@ var ( func (ccms *CCMetricStore) buildQueries(job *schema.Job, metrics []string, scopes []schema.MetricScope) ([]ApiQuery, []schema.MetricScope, error) { queries := make([]ApiQuery, 0, len(metrics)*len(scopes)*len(job.Resources)) - topology := config.GetPartition(job.Cluster, job.Partition).Topology + topology := config.GetSubCluster(job.Cluster, job.SubCluster).Topology assignedScope := []schema.MetricScope{} for _, metric := range metrics { diff --git a/metricdata/metricdata.go b/metricdata/metricdata.go index d4d9817..8f4122a 100644 --- a/metricdata/metricdata.go +++ b/metricdata/metricdata.go @@ -79,7 +79,7 @@ func LoadData(job *schema.Job, metrics []string, scopes []schema.MetricScope, ct } if metrics == nil { - cluster := config.GetClusterConfig(job.Cluster) + cluster := config.GetCluster(job.Cluster) for _, mc := range cluster.MetricConfig { metrics = append(metrics, mc.Name) } @@ -167,7 +167,7 @@ func LoadNodeData(cluster, partition string, metrics, nodes []string, scopes []s } if metrics == nil { - for _, m := range config.GetClusterConfig(cluster).MetricConfig { + for _, m := range config.GetCluster(cluster).MetricConfig { metrics = append(metrics, m.Name) } } diff --git a/repository/import.go b/repository/import.go index 94fba51..a18c189 100644 --- a/repository/import.go +++ b/repository/import.go @@ -122,12 +122,13 @@ func (r *JobRepository) ImportJob(jobMeta *schema.JobMeta, jobData *schema.JobDa return nil } +// This function also sets the subcluster if necessary! func SanityChecks(job *schema.BaseJob) error { - if c := config.GetClusterConfig(job.Cluster); c == nil { + if c := config.GetCluster(job.Cluster); c == nil { return fmt.Errorf("no such cluster: %#v", job.Cluster) } - if p := config.GetPartition(job.Cluster, job.Partition); p == nil { - return fmt.Errorf("no such partition: %#v (on cluster %#v)", job.Partition, job.Cluster) + if err := config.AssignSubCluster(job); err != nil { + return err } if !job.State.Valid() { return fmt.Errorf("not a valid job state: %#v", job.State) diff --git a/repository/job.go b/repository/job.go index 732ef6f..567b512 100644 --- a/repository/job.go +++ b/repository/job.go @@ -31,17 +31,17 @@ func (r *JobRepository) Init() error { } var jobColumns []string = []string{ - "job.id", "job.job_id", "job.user", "job.project", "job.cluster", "job.start_time", "job.partition", "job.array_job_id", + "job.id", "job.job_id", "job.user", "job.project", "job.cluster", "job.subcluster", "job.start_time", "job.partition", "job.array_job_id", "job.num_nodes", "job.num_hwthreads", "job.num_acc", "job.exclusive", "job.monitoring_status", "job.smt", "job.job_state", - "job.duration", "job.resources", // "job.meta_data", + "job.duration", "job.walltime", "job.resources", // "job.meta_data", } func scanJob(row interface{ Scan(...interface{}) error }) (*schema.Job, error) { job := &schema.Job{} if err := row.Scan( - &job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId, + &job.ID, &job.JobID, &job.User, &job.Project, &job.Cluster, &job.SubCluster, &job.StartTimeUnix, &job.Partition, &job.ArrayJobId, &job.NumNodes, &job.NumHWThreads, &job.NumAcc, &job.Exclusive, &job.MonitoringStatus, &job.SMT, &job.State, - &job.Duration, &job.RawResources /*&job.MetaData*/); err != nil { + &job.Duration, &job.Walltime, &job.RawResources /*&job.MetaData*/); err != nil { return nil, err } From 839db9fdae59013e259b73e8b10374056e91df7a Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Mon, 14 Mar 2022 10:24:27 +0100 Subject: [PATCH 3/7] List of slurm partitions via GraphQL --- gqlgen.yml | 4 ++ graph/generated/generated.go | 85 ++++++++++++++++++++++++++++-------- graph/schema.graphqls | 3 +- graph/schema.resolvers.go | 4 +- 4 files changed, 75 insertions(+), 21 deletions(-) diff --git a/gqlgen.yml b/gqlgen.yml index 576f2bb..f02ce61 100644 --- a/gqlgen.yml +++ b/gqlgen.yml @@ -61,6 +61,10 @@ models: resolver: true metaData: resolver: true + Cluster: + fields: + partitions: + resolver: true NullableFloat: { model: "github.com/ClusterCockpit/cc-backend/schema.Float" } MetricScope: { model: "github.com/ClusterCockpit/cc-backend/schema.MetricScope" } JobStatistics: { model: "github.com/ClusterCockpit/cc-backend/schema.JobStatistics" } diff --git a/graph/generated/generated.go b/graph/generated/generated.go index 323229c..877ec42 100644 --- a/graph/generated/generated.go +++ b/graph/generated/generated.go @@ -57,6 +57,7 @@ type ComplexityRoot struct { FilterRanges func(childComplexity int) int MetricConfig func(childComplexity int) int Name func(childComplexity int) int + Partitions func(childComplexity int) int SubClusters func(childComplexity int) int } @@ -238,7 +239,7 @@ type ComplexityRoot struct { } type ClusterResolver interface { - SubClusters(ctx context.Context, obj *model.Cluster) ([]*model.SubCluster, error) + Partitions(ctx context.Context, obj *model.Cluster) ([]string, error) } type JobResolver interface { MetaData(ctx context.Context, obj *schema.Job) (interface{}, error) @@ -321,6 +322,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Cluster.Name(childComplexity), true + case "Cluster.partitions": + if e.complexity.Cluster.Partitions == nil { + break + } + + return e.complexity.Cluster.Partitions(childComplexity), true + case "Cluster.subClusters": if e.complexity.Cluster.SubClusters == nil { break @@ -1260,9 +1268,10 @@ type Job { type Cluster { name: String! + partitions: [String!]! # Slurm partitions metricConfig: [MetricConfig!]! filterRanges: FilterRanges! - subClusters: [SubCluster!]! + subClusters: [SubCluster!]! # Hardware partitions/subclusters } type SubCluster { @@ -2084,6 +2093,41 @@ func (ec *executionContext) _Cluster_name(ctx context.Context, field graphql.Col return ec.marshalNString2string(ctx, field.Selections, res) } +func (ec *executionContext) _Cluster_partitions(ctx context.Context, field graphql.CollectedField, obj *model.Cluster) (ret graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + fc := &graphql.FieldContext{ + Object: "Cluster", + Field: field, + Args: nil, + IsMethod: true, + IsResolver: true, + } + + ctx = graphql.WithFieldContext(ctx, fc) + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.Cluster().Partitions(rctx, obj) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.([]string) + fc.Result = res + return ec.marshalNString2ᚕstringᚄ(ctx, field.Selections, res) +} + func (ec *executionContext) _Cluster_metricConfig(ctx context.Context, field graphql.CollectedField, obj *model.Cluster) (ret graphql.Marshaler) { defer func() { if r := recover(); r != nil { @@ -2165,14 +2209,14 @@ func (ec *executionContext) _Cluster_subClusters(ctx context.Context, field grap Object: "Cluster", Field: field, Args: nil, - IsMethod: true, - IsResolver: true, + IsMethod: false, + IsResolver: false, } ctx = graphql.WithFieldContext(ctx, fc) resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return ec.resolvers.Cluster().SubClusters(rctx, obj) + return obj.SubClusters, nil }) if err != nil { ec.Error(ctx, err) @@ -7636,6 +7680,20 @@ func (ec *executionContext) _Cluster(ctx context.Context, sel ast.SelectionSet, if out.Values[i] == graphql.Null { atomic.AddUint32(&invalids, 1) } + case "partitions": + field := field + out.Concurrently(i, func() (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Cluster_partitions(ctx, field, obj) + if res == graphql.Null { + atomic.AddUint32(&invalids, 1) + } + return res + }) case "metricConfig": out.Values[i] = ec._Cluster_metricConfig(ctx, field, obj) if out.Values[i] == graphql.Null { @@ -7647,19 +7705,10 @@ func (ec *executionContext) _Cluster(ctx context.Context, sel ast.SelectionSet, atomic.AddUint32(&invalids, 1) } case "subClusters": - field := field - out.Concurrently(i, func() (res graphql.Marshaler) { - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - } - }() - res = ec._Cluster_subClusters(ctx, field, obj) - if res == graphql.Null { - atomic.AddUint32(&invalids, 1) - } - return res - }) + out.Values[i] = ec._Cluster_subClusters(ctx, field, obj) + if out.Values[i] == graphql.Null { + atomic.AddUint32(&invalids, 1) + } default: panic("unknown field " + strconv.Quote(field.Name)) } diff --git a/graph/schema.graphqls b/graph/schema.graphqls index 25d44a8..9c0e341 100644 --- a/graph/schema.graphqls +++ b/graph/schema.graphqls @@ -31,9 +31,10 @@ type Job { type Cluster { name: String! + partitions: [String!]! # Slurm partitions metricConfig: [MetricConfig!]! filterRanges: FilterRanges! - subClusters: [SubCluster!]! + subClusters: [SubCluster!]! # Hardware partitions/subclusters } type SubCluster { diff --git a/graph/schema.resolvers.go b/graph/schema.resolvers.go index 8fec60c..fdd1937 100644 --- a/graph/schema.resolvers.go +++ b/graph/schema.resolvers.go @@ -18,8 +18,8 @@ import ( "github.com/ClusterCockpit/cc-backend/schema" ) -func (r *clusterResolver) SubClusters(ctx context.Context, obj *model.Cluster) ([]*model.SubCluster, error) { - panic(fmt.Errorf("not implemented")) +func (r *clusterResolver) Partitions(ctx context.Context, obj *model.Cluster) ([]string, error) { + return r.Repo.Partitions(obj.Name) } func (r *jobResolver) MetaData(ctx context.Context, obj *schema.Job) (interface{}, error) { From e9195aa2e3ee008251663ac334b2a6e0502ab67c Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Mon, 14 Mar 2022 10:43:35 +0100 Subject: [PATCH 4/7] fix repository/ tests --- .github/workflows/test.yml | 5 +++-- test/test.db | Bin 131072 -> 131072 bytes 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3d6a89d..a631ef2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -14,5 +14,6 @@ jobs: run: | go build ./... go vet ./... - go test . - env BASEPATH="../" go test ./repository + go test . + env BASEPATH="../" go test ./repository + env BASEPATH="../" go test ./config diff --git a/test/test.db b/test/test.db index 7dcff504474b7c7fdb8f402616a8fe01472c3a11..0d9b290daccf43c1a470997b5eb7b26e452bb4bf 100644 GIT binary patch delta 145 zcmZo@;Am*zm>@0a%D}*&0K_oBJyFM4(Un0j+JF}*#KjuQz;}lKI&UrCMAlHAkKC7e zj&M)pw%^!zm4&N2ot0f&SeS9T>E`?FQ(3un6v`8Ga!N9DQxzt!=N1(-&`~HZO-jxw qEiOqdQV4O4n7ol&m|ZD1F*8p|bMtgoJEqO0yv@0a!oa|w0K_oBIZ?-0(S24+bvQ=?r{F_|Nl}@^!LbW%p#$ zW|v`o$vS Date: Mon, 14 Mar 2022 11:08:01 +0100 Subject: [PATCH 5/7] Update frontend/ --- frontend | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontend b/frontend index eae185a..7dbabf1 160000 --- a/frontend +++ b/frontend @@ -1 +1 @@ -Subproject commit eae185a9f6c006b61657df6897447f9a0761b42f +Subproject commit 7dbabf140d704b38a1fe29b8248e4226ce0c1b23 From 7be38277a98edfb6365e1e34150f91a67b387e9e Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Tue, 15 Mar 2022 08:29:29 +0100 Subject: [PATCH 6/7] cleanup and comments --- .github/workflows/test.yml | 4 +- init-db.go => repository/init.go | 37 +++--- repository/job_test.go | 9 +- routes.go | 128 +++++++++++++++++++ runtimeSetup.go | 7 ++ server.go | 208 +++++++------------------------ api_test.go => test/api_test.go | 14 +-- test/db.go | 26 ---- 8 files changed, 210 insertions(+), 223 deletions(-) rename init-db.go => repository/init.go (88%) rename api_test.go => test/api_test.go (95%) delete mode 100644 test/db.go diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a631ef2..1b0590e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -14,6 +14,4 @@ jobs: run: | go build ./... go vet ./... - go test . - env BASEPATH="../" go test ./repository - env BASEPATH="../" go test ./config + go test ./... diff --git a/init-db.go b/repository/init.go similarity index 88% rename from init-db.go rename to repository/init.go index 0346080..44b8bd6 100644 --- a/init-db.go +++ b/repository/init.go @@ -1,4 +1,4 @@ -package main +package repository import ( "bufio" @@ -9,14 +9,13 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/log" - "github.com/ClusterCockpit/cc-backend/repository" "github.com/ClusterCockpit/cc-backend/schema" "github.com/jmoiron/sqlx" ) // `AUTO_INCREMENT` is in a comment because of this hack: // https://stackoverflow.com/a/41028314 (sqlite creates unique ids automatically) -const JOBS_DB_SCHEMA string = ` +const JobsDBSchema string = ` DROP TABLE IF EXISTS jobtag; DROP TABLE IF EXISTS job; DROP TABLE IF EXISTS tag; @@ -32,8 +31,8 @@ const JOBS_DB_SCHEMA string = ` project VARCHAR(255) NOT NULL, ` + "`partition`" + ` VARCHAR(255) NOT NULL, -- partition is a keyword in mysql -.- array_job_id BIGINT NOT NULL, - duration INT, - walltime INT, + duration INT NOT NULL DEFAULT 0, + walltime INT NOT NULL DEFAULT 0, job_state VARCHAR(255) NOT NULL CHECK(job_state IN ('running', 'completed', 'failed', 'cancelled', 'stopped', 'timeout', 'preempted', 'out_of_memory')), meta_data TEXT, -- JSON resources TEXT NOT NULL, -- JSON @@ -68,7 +67,8 @@ const JOBS_DB_SCHEMA string = ` FOREIGN KEY (tag_id) REFERENCES tag (id) ON DELETE CASCADE); ` -const JOBS_DB_INDEXES string = ` +// Indexes are created after the job-archive is traversed for faster inserts. +const JobsDbIndexes string = ` CREATE INDEX job_by_user ON job (user); CREATE INDEX job_by_starttime ON job (start_time); CREATE INDEX job_by_job_id ON job (job_id); @@ -77,12 +77,12 @@ const JOBS_DB_INDEXES string = ` // Delete the tables "job", "tag" and "jobtag" from the database and // repopulate them using the jobs found in `archive`. -func initDB(db *sqlx.DB, archive string) error { +func InitDB(db *sqlx.DB, archive string) error { starttime := time.Now() log.Print("Building job table...") // Basic database structure: - _, err := db.Exec(JOBS_DB_SCHEMA) + _, err := db.Exec(JobsDBSchema) if err != nil { return err } @@ -96,16 +96,21 @@ func initDB(db *sqlx.DB, archive string) error { return err } + // Inserts are bundled into transactions because in sqlite, + // that speeds up inserts A LOT. tx, err := db.Beginx() if err != nil { return err } - stmt, err := tx.PrepareNamed(repository.NamedJobInsert) + stmt, err := tx.PrepareNamed(NamedJobInsert) if err != nil { return err } + // Not using log.Print because we want the line to end with `\r` and + // this function is only ever called when a special command line flag + // is passed anyways. fmt.Printf("%d jobs inserted...\r", 0) i := 0 tags := make(map[string]int64) @@ -159,6 +164,8 @@ func initDB(db *sqlx.DB, archive string) error { return err } + // For compability with the old job-archive directory structure where + // there was no start time directory. for _, startTimeDir := range startTimeDirs { if startTimeDir.Type().IsRegular() && startTimeDir.Name() == "meta.json" { if err := handleDirectory(dirpath); err != nil { @@ -180,7 +187,7 @@ func initDB(db *sqlx.DB, archive string) error { // Create indexes after inserts so that they do not // need to be continually updated. - if _, err := db.Exec(JOBS_DB_INDEXES); err != nil { + if _, err := db.Exec(JobsDbIndexes); err != nil { return err } @@ -226,7 +233,7 @@ func loadJob(tx *sqlx.Tx, stmt *sqlx.NamedStmt, tags map[string]int64, path stri return err } - if err := repository.SanityChecks(&job.BaseJob); err != nil { + if err := SanityChecks(&job.BaseJob); err != nil { return err } @@ -262,11 +269,3 @@ func loadJob(tx *sqlx.Tx, stmt *sqlx.NamedStmt, tags map[string]int64, path stri return nil } - -func loadJobStat(job *schema.JobMeta, metric string) float64 { - if stats, ok := job.Statistics[metric]; ok { - return stats.Avg - } - - return 0.0 -} diff --git a/repository/job_test.go b/repository/job_test.go index 9d8d132..5cf54bb 100644 --- a/repository/job_test.go +++ b/repository/job_test.go @@ -5,14 +5,17 @@ import ( "testing" "github.com/jmoiron/sqlx" - - "github.com/ClusterCockpit/cc-backend/test" + _ "github.com/mattn/go-sqlite3" ) var db *sqlx.DB func init() { - db = test.InitDB() + var err error + db, err = sqlx.Open("sqlite3", "../test/test.db") + if err != nil { + fmt.Println(err) + } } func setup(t *testing.T) *JobRepository { diff --git a/routes.go b/routes.go index 0caeae2..243a4e7 100644 --- a/routes.go +++ b/routes.go @@ -9,6 +9,9 @@ import ( "github.com/ClusterCockpit/cc-backend/auth" "github.com/ClusterCockpit/cc-backend/config" + "github.com/ClusterCockpit/cc-backend/graph" + "github.com/ClusterCockpit/cc-backend/graph/model" + "github.com/ClusterCockpit/cc-backend/log" "github.com/ClusterCockpit/cc-backend/schema" "github.com/ClusterCockpit/cc-backend/templates" "github.com/gorilla/mux" @@ -24,6 +27,131 @@ type Route struct { Setup func(i InfoType, r *http.Request) InfoType } +var routes []Route = []Route{ + {"/", "home.tmpl", "ClusterCockpit", false, setupHomeRoute}, + {"/config", "config.tmpl", "Settings", false, func(i InfoType, r *http.Request) InfoType { return i }}, + {"/monitoring/jobs/", "monitoring/jobs.tmpl", "Jobs - ClusterCockpit", true, func(i InfoType, r *http.Request) InfoType { return i }}, + {"/monitoring/job/{id:[0-9]+}", "monitoring/job.tmpl", "Job - ClusterCockpit", false, setupJobRoute}, + {"/monitoring/users/", "monitoring/list.tmpl", "Users - ClusterCockpit", true, func(i InfoType, r *http.Request) InfoType { i["listType"] = "USER"; return i }}, + {"/monitoring/projects/", "monitoring/list.tmpl", "Projects - ClusterCockpit", true, func(i InfoType, r *http.Request) InfoType { i["listType"] = "PROJECT"; return i }}, + {"/monitoring/tags/", "monitoring/taglist.tmpl", "Tags - ClusterCockpit", false, setupTaglistRoute}, + {"/monitoring/user/{id}", "monitoring/user.tmpl", "User - ClusterCockpit", true, setupUserRoute}, + {"/monitoring/systems/{cluster}", "monitoring/systems.tmpl", "Cluster - ClusterCockpit", false, setupClusterRoute}, + {"/monitoring/node/{cluster}/{hostname}", "monitoring/node.tmpl", "Node - ClusterCockpit", false, setupNodeRoute}, + {"/monitoring/analysis/{cluster}", "monitoring/analysis.tmpl", "Analaysis - ClusterCockpit", true, setupAnalysisRoute}, +} + +func setupHomeRoute(i InfoType, r *http.Request) InfoType { + type cluster struct { + Name string + RunningJobs int + TotalJobs int + RecentShortJobs int + } + + runningJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{ + State: []schema.JobState{schema.JobStateRunning}, + }}, nil) + if err != nil { + log.Errorf("failed to count jobs: %s", err.Error()) + runningJobs = map[string]int{} + } + totalJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, nil, nil) + if err != nil { + log.Errorf("failed to count jobs: %s", err.Error()) + totalJobs = map[string]int{} + } + + from := time.Now().Add(-24 * time.Hour) + recentShortJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{ + StartTime: &model.TimeRange{From: &from, To: nil}, + Duration: &model.IntRange{From: 0, To: graph.ShortJobDuration}, + }}, nil) + if err != nil { + log.Errorf("failed to count jobs: %s", err.Error()) + recentShortJobs = map[string]int{} + } + + clusters := make([]cluster, 0) + for _, c := range config.Clusters { + clusters = append(clusters, cluster{ + Name: c.Name, + RunningJobs: runningJobs[c.Name], + TotalJobs: totalJobs[c.Name], + RecentShortJobs: recentShortJobs[c.Name], + }) + } + + i["clusters"] = clusters + return i +} + +func setupJobRoute(i InfoType, r *http.Request) InfoType { + i["id"] = mux.Vars(r)["id"] + return i +} + +func setupUserRoute(i InfoType, r *http.Request) InfoType { + i["id"] = mux.Vars(r)["id"] + i["username"] = mux.Vars(r)["id"] + return i +} + +func setupClusterRoute(i InfoType, r *http.Request) InfoType { + vars := mux.Vars(r) + i["id"] = vars["cluster"] + i["cluster"] = vars["cluster"] + from, to := r.URL.Query().Get("from"), r.URL.Query().Get("to") + if from != "" || to != "" { + i["from"] = from + i["to"] = to + } + return i +} + +func setupNodeRoute(i InfoType, r *http.Request) InfoType { + vars := mux.Vars(r) + i["cluster"] = vars["cluster"] + i["hostname"] = vars["hostname"] + from, to := r.URL.Query().Get("from"), r.URL.Query().Get("to") + if from != "" || to != "" { + i["from"] = from + i["to"] = to + } + return i +} + +func setupAnalysisRoute(i InfoType, r *http.Request) InfoType { + i["cluster"] = mux.Vars(r)["cluster"] + return i +} + +func setupTaglistRoute(i InfoType, r *http.Request) InfoType { + var username *string = nil + if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleAdmin) { + username = &user.Username + } + + tags, counts, err := jobRepo.CountTags(username) + tagMap := make(map[string][]map[string]interface{}) + if err != nil { + log.Errorf("GetTags failed: %s", err.Error()) + i["tagmap"] = tagMap + return i + } + + for _, tag := range tags { + tagItem := map[string]interface{}{ + "id": tag.ID, + "name": tag.Name, + "count": counts[tag.Name], + } + tagMap[tag.Type] = append(tagMap[tag.Type], tagItem) + } + i["tagmap"] = tagMap + return i +} + func buildFilterPresets(query url.Values) map[string]interface{} { filterPresets := map[string]interface{}{} diff --git a/runtimeSetup.go b/runtimeSetup.go index 070cf30..f43e569 100644 --- a/runtimeSetup.go +++ b/runtimeSetup.go @@ -12,6 +12,9 @@ import ( "syscall" ) +// Very simple and limited .env file reader. +// All variable definitions found are directly +// added to the processes environment. func loadEnv(file string) error { f, err := os.Open(file) if err != nil { @@ -74,6 +77,10 @@ func loadEnv(file string) error { return s.Err() } +// Changes the processes user and group to that +// specified in the config.json. The go runtime +// takes care of all threads (and not only the calling one) +// executing the underlying systemcall. func dropPrivileges() error { if programConfig.Group != "" { g, err := user.LookupGroup(programConfig.Group) diff --git a/server.go b/server.go index c9361c1..de89430 100644 --- a/server.go +++ b/server.go @@ -25,11 +25,9 @@ import ( "github.com/ClusterCockpit/cc-backend/config" "github.com/ClusterCockpit/cc-backend/graph" "github.com/ClusterCockpit/cc-backend/graph/generated" - "github.com/ClusterCockpit/cc-backend/graph/model" "github.com/ClusterCockpit/cc-backend/log" "github.com/ClusterCockpit/cc-backend/metricdata" "github.com/ClusterCockpit/cc-backend/repository" - "github.com/ClusterCockpit/cc-backend/schema" "github.com/ClusterCockpit/cc-backend/templates" "github.com/google/gops/agent" "github.com/gorilla/handlers" @@ -40,7 +38,6 @@ import ( _ "github.com/mattn/go-sqlite3" ) -var db *sqlx.DB var jobRepo *repository.JobRepository // Format of the configurartion (file). See below for the defaults. @@ -127,147 +124,22 @@ var programConfig ProgramConfig = ProgramConfig{ }, } -func setupHomeRoute(i InfoType, r *http.Request) InfoType { - type cluster struct { - Name string - RunningJobs int - TotalJobs int - RecentShortJobs int - } - - runningJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{ - State: []schema.JobState{schema.JobStateRunning}, - }}, nil) - if err != nil { - log.Errorf("failed to count jobs: %s", err.Error()) - runningJobs = map[string]int{} - } - totalJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, nil, nil) - if err != nil { - log.Errorf("failed to count jobs: %s", err.Error()) - totalJobs = map[string]int{} - } - - from := time.Now().Add(-24 * time.Hour) - recentShortJobs, err := jobRepo.CountGroupedJobs(r.Context(), model.AggregateCluster, []*model.JobFilter{{ - StartTime: &model.TimeRange{From: &from, To: nil}, - Duration: &model.IntRange{From: 0, To: graph.ShortJobDuration}, - }}, nil) - if err != nil { - log.Errorf("failed to count jobs: %s", err.Error()) - recentShortJobs = map[string]int{} - } - - clusters := make([]cluster, 0) - for _, c := range config.Clusters { - clusters = append(clusters, cluster{ - Name: c.Name, - RunningJobs: runningJobs[c.Name], - TotalJobs: totalJobs[c.Name], - RecentShortJobs: recentShortJobs[c.Name], - }) - } - - i["clusters"] = clusters - return i -} - -func setupJobRoute(i InfoType, r *http.Request) InfoType { - i["id"] = mux.Vars(r)["id"] - return i -} - -func setupUserRoute(i InfoType, r *http.Request) InfoType { - i["id"] = mux.Vars(r)["id"] - i["username"] = mux.Vars(r)["id"] - return i -} - -func setupClusterRoute(i InfoType, r *http.Request) InfoType { - vars := mux.Vars(r) - i["id"] = vars["cluster"] - i["cluster"] = vars["cluster"] - from, to := r.URL.Query().Get("from"), r.URL.Query().Get("to") - if from != "" || to != "" { - i["from"] = from - i["to"] = to - } - return i -} - -func setupNodeRoute(i InfoType, r *http.Request) InfoType { - vars := mux.Vars(r) - i["cluster"] = vars["cluster"] - i["hostname"] = vars["hostname"] - from, to := r.URL.Query().Get("from"), r.URL.Query().Get("to") - if from != "" || to != "" { - i["from"] = from - i["to"] = to - } - return i -} - -func setupAnalysisRoute(i InfoType, r *http.Request) InfoType { - i["cluster"] = mux.Vars(r)["cluster"] - return i -} - -func setupTaglistRoute(i InfoType, r *http.Request) InfoType { - var username *string = nil - if user := auth.GetUser(r.Context()); user != nil && !user.HasRole(auth.RoleAdmin) { - username = &user.Username - } - - tags, counts, err := jobRepo.CountTags(username) - tagMap := make(map[string][]map[string]interface{}) - if err != nil { - log.Errorf("GetTags failed: %s", err.Error()) - i["tagmap"] = tagMap - return i - } - - for _, tag := range tags { - tagItem := map[string]interface{}{ - "id": tag.ID, - "name": tag.Name, - "count": counts[tag.Name], - } - tagMap[tag.Type] = append(tagMap[tag.Type], tagItem) - } - log.Infof("TAGS %+v", tags) - i["tagmap"] = tagMap - return i -} - -var routes []Route = []Route{ - {"/", "home.tmpl", "ClusterCockpit", false, setupHomeRoute}, - {"/config", "config.tmpl", "Settings", false, func(i InfoType, r *http.Request) InfoType { return i }}, - {"/monitoring/jobs/", "monitoring/jobs.tmpl", "Jobs - ClusterCockpit", true, func(i InfoType, r *http.Request) InfoType { return i }}, - {"/monitoring/job/{id:[0-9]+}", "monitoring/job.tmpl", "Job - ClusterCockpit", false, setupJobRoute}, - {"/monitoring/users/", "monitoring/list.tmpl", "Users - ClusterCockpit", true, func(i InfoType, r *http.Request) InfoType { i["listType"] = "USER"; return i }}, - {"/monitoring/projects/", "monitoring/list.tmpl", "Projects - ClusterCockpit", true, func(i InfoType, r *http.Request) InfoType { i["listType"] = "PROJECT"; return i }}, - {"/monitoring/tags/", "monitoring/taglist.tmpl", "Tags - ClusterCockpit", false, setupTaglistRoute}, - {"/monitoring/user/{id}", "monitoring/user.tmpl", "User - ClusterCockpit", true, setupUserRoute}, - {"/monitoring/systems/{cluster}", "monitoring/systems.tmpl", "Cluster - ClusterCockpit", false, setupClusterRoute}, - {"/monitoring/node/{cluster}/{hostname}", "monitoring/node.tmpl", "Node - ClusterCockpit", false, setupNodeRoute}, - {"/monitoring/analysis/{cluster}", "monitoring/analysis.tmpl", "Analaysis - ClusterCockpit", true, setupAnalysisRoute}, -} - func main() { var flagReinitDB, flagStopImmediately, flagSyncLDAP, flagGops bool var flagConfigFile, flagImportJob string var flagNewUser, flagDelUser, flagGenJWT string - flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize `job`, `tag`, and `jobtag` tables") - flag.BoolVar(&flagSyncLDAP, "sync-ldap", false, "Sync the `user` table with ldap") + flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize the 'job', 'tag', and 'jobtag' tables (all running jobs will be lost!)") + flag.BoolVar(&flagSyncLDAP, "sync-ldap", false, "Sync the 'user' table with ldap") flag.BoolVar(&flagStopImmediately, "no-server", false, "Do not start a server, stop right after initialization and argument handling") - flag.BoolVar(&flagGops, "gops", false, "Enable a github.com/google/gops/agent") - flag.StringVar(&flagConfigFile, "config", "", "Location of the config file for this server (overwrites the defaults)") + flag.BoolVar(&flagGops, "gops", false, "Listen via github.com/google/gops/agent (for debugging)") + flag.StringVar(&flagConfigFile, "config", "", "Overwrite the global config options by those specified in `config.json`") flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: `:[admin,api,user]:`") - flag.StringVar(&flagDelUser, "del-user", "", "Remove user by username") - flag.StringVar(&flagGenJWT, "jwt", "", "Generate and print a JWT for the user specified by the username") + flag.StringVar(&flagDelUser, "del-user", "", "Remove user by `username`") + flag.StringVar(&flagGenJWT, "jwt", "", "Generate and print a JWT for the user specified by its `username`") flag.StringVar(&flagImportJob, "import-job", "", "Import a job. Argument format: `:,...`") flag.Parse() + // See https://github.com/google/gops (Runtime overhead is almost zero) if flagGops { if err := agent.Listen(agent.Options{}); err != nil { log.Fatalf("gops/agent.Listen failed: %s", err.Error()) @@ -291,18 +163,24 @@ func main() { } } + // As a special case for `db`, allow using an environment variable instead of the value + // stored in the config. This can be done for people having security concerns about storing + // the password for their mysql database in the config.json. if strings.HasPrefix(programConfig.DB, "env:") { envvar := strings.TrimPrefix(programConfig.DB, "env:") programConfig.DB = os.Getenv(envvar) } var err error + var db *sqlx.DB if programConfig.DBDriver == "sqlite3" { db, err = sqlx.Open("sqlite3", fmt.Sprintf("%s?_foreign_keys=on", programConfig.DB)) if err != nil { log.Fatal(err) } + // sqlite does not multithread. Having more than one connection open would just mean + // waiting for locks. db.SetMaxOpenConns(1) } else if programConfig.DBDriver == "mysql" { db, err = sqlx.Open("mysql", fmt.Sprintf("%s?multiStatements=true", programConfig.DB)) @@ -317,7 +195,9 @@ func main() { log.Fatalf("unsupported database driver: %s", programConfig.DBDriver) } - // Initialize sub-modules... + // Initialize sub-modules and handle all command line flags. + // The order here is important! For example, the metricdata package + // depends on the config package. var authentication *auth.Authentication if !programConfig.DisableAuthentication { @@ -380,7 +260,7 @@ func main() { } if flagReinitDB { - if err := initDB(db, programConfig.JobArchive); err != nil { + if err := repository.InitDB(db, programConfig.JobArchive); err != nil { log.Fatal(err) } } @@ -400,11 +280,13 @@ func main() { return } - // Build routes... + // Setup the http.Handler/Router used by the server resolver := &graph.Resolver{DB: db, Repo: jobRepo} graphQLEndpoint := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: resolver})) if os.Getenv("DEBUG") != "1" { + // Having this handler means that a error message is returned via GraphQL instead of the connection simply beeing closed. + // The problem with this is that then, no more stacktrace is printed to stderr. graphQLEndpoint.SetRecoverFunc(func(ctx context.Context, err interface{}) error { switch e := err.(type) { case string: @@ -417,7 +299,6 @@ func main() { }) } - graphQLPlayground := playground.Handler("GraphQL playground", "/query") api := &api.RestApi{ JobRepository: jobRepo, Resolver: resolver, @@ -425,33 +306,21 @@ func main() { Authentication: authentication, } - handleGetLogin := func(rw http.ResponseWriter, r *http.Request) { - templates.Render(rw, r, "login.tmpl", &templates.Page{ - Title: "Login", - }) - } - r := mux.NewRouter() - r.NotFoundHandler = http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - templates.Render(rw, r, "404.tmpl", &templates.Page{ - Title: "Not found", - }) - }) - r.Handle("/playground", graphQLPlayground) - - r.HandleFunc("/login", handleGetLogin).Methods(http.MethodGet) + r.HandleFunc("/login", func(rw http.ResponseWriter, r *http.Request) { + templates.Render(rw, r, "login.tmpl", &templates.Page{Title: "Login"}) + }).Methods(http.MethodGet) r.HandleFunc("/imprint", func(rw http.ResponseWriter, r *http.Request) { - templates.Render(rw, r, "imprint.tmpl", &templates.Page{ - Title: "Imprint", - }) + templates.Render(rw, r, "imprint.tmpl", &templates.Page{Title: "Imprint"}) }) r.HandleFunc("/privacy", func(rw http.ResponseWriter, r *http.Request) { - templates.Render(rw, r, "privacy.tmpl", &templates.Page{ - Title: "Privacy", - }) + templates.Render(rw, r, "privacy.tmpl", &templates.Page{Title: "Privacy"}) }) + // Some routes, such as /login or /query, should only be accessible to a user that is logged in. + // Those should be mounted to this subrouter. If authentication is enabled, a middleware will prevent + // any unauthenticated accesses. secured := r.PathPrefix("/").Subrouter() if !programConfig.DisableAuthentication { r.Handle("/login", authentication.Login( @@ -490,8 +359,11 @@ func main() { }) }) } + + r.Handle("/playground", playground.Handler("GraphQL playground", "/query")) secured.Handle("/query", graphQLEndpoint) + // Send a searchId and then reply with a redirect to a user or job. secured.HandleFunc("/search", func(rw http.ResponseWriter, r *http.Request) { if search := r.URL.Query().Get("searchId"); search != "" { job, username, err := api.JobRepository.FindJobOrUser(r.Context(), search) @@ -515,6 +387,7 @@ func main() { } }) + // Mount all /monitoring/... and /api/... routes. setupRoutes(secured, routes) api.MountRoutes(secured) @@ -525,11 +398,18 @@ func main() { handlers.AllowedHeaders([]string{"X-Requested-With", "Content-Type", "Authorization"}), handlers.AllowedMethods([]string{"GET", "POST", "HEAD", "OPTIONS"}), handlers.AllowedOrigins([]string{"*"}))) - handler := handlers.CustomLoggingHandler(log.InfoWriter, r, func(w io.Writer, params handlers.LogFormatterParams) { - log.Finfof(w, "%s %s (%d, %.02fkb, %dms)", - params.Request.Method, params.URL.RequestURI(), - params.StatusCode, float32(params.Size)/1024, - time.Since(params.TimeStamp).Milliseconds()) + handler := handlers.CustomLoggingHandler(io.Discard, r, func(_ io.Writer, params handlers.LogFormatterParams) { + if strings.HasPrefix(params.Request.RequestURI, "/api/") { + log.Infof("%s %s (%d, %.02fkb, %dms)", + params.Request.Method, params.URL.RequestURI(), + params.StatusCode, float32(params.Size)/1024, + time.Since(params.TimeStamp).Milliseconds()) + } else { + log.Debugf("%s %s (%d, %.02fkb, %dms)", + params.Request.Method, params.URL.RequestURI(), + params.StatusCode, float32(params.Size)/1024, + time.Since(params.TimeStamp).Milliseconds()) + } }) var wg sync.WaitGroup diff --git a/api_test.go b/test/api_test.go similarity index 95% rename from api_test.go rename to test/api_test.go index 11a2454..b4427f7 100644 --- a/api_test.go +++ b/test/api_test.go @@ -1,4 +1,4 @@ -package main +package test import ( "bytes" @@ -21,13 +21,11 @@ import ( "github.com/ClusterCockpit/cc-backend/schema" "github.com/gorilla/mux" "github.com/jmoiron/sqlx" + + _ "github.com/mattn/go-sqlite3" ) func setup(t *testing.T) *api.RestApi { - if db != nil { - panic("prefer using sub-tests (`t.Run`) or implement `cleanup` before calling setup twice.") - } - const testclusterJson = `{ "name": "testcluster", "subClusters": [ @@ -96,17 +94,17 @@ func setup(t *testing.T) *api.RestApi { } f.Close() - db, err = sqlx.Open("sqlite3", fmt.Sprintf("%s?_foreign_keys=on", dbfilepath)) + db, err := sqlx.Open("sqlite3", fmt.Sprintf("%s?_foreign_keys=on", dbfilepath)) if err != nil { t.Fatal(err) } db.SetMaxOpenConns(1) - if _, err := db.Exec(JOBS_DB_SCHEMA); err != nil { + if _, err := db.Exec(repository.JobsDBSchema); err != nil { t.Fatal(err) } - if err := config.Init(db, false, programConfig.UiDefaults, jobarchive); err != nil { + if err := config.Init(db, false, map[string]interface{}{}, jobarchive); err != nil { t.Fatal(err) } diff --git a/test/db.go b/test/db.go deleted file mode 100644 index 8553ef6..0000000 --- a/test/db.go +++ /dev/null @@ -1,26 +0,0 @@ -package test - -import ( - "fmt" - "os" - - "github.com/jmoiron/sqlx" - _ "github.com/mattn/go-sqlite3" -) - -func InitDB() *sqlx.DB { - - bp := "./" - ebp := os.Getenv("BASEPATH") - - if ebp != "" { - bp = ebp + "test/" - } - - db, err := sqlx.Open("sqlite3", bp+"test.db") - if err != nil { - fmt.Println(err) - } - - return db -} From 641959101ccc22ff9be030dd37668946f8b2a59b Mon Sep 17 00:00:00 2001 From: Lou Knauer Date: Tue, 15 Mar 2022 09:49:41 +0100 Subject: [PATCH 7/7] Add metadata to REST-API --- api/openapi.yaml | 3 +++ api/rest.go | 10 ++++++++++ 2 files changed, 13 insertions(+) diff --git a/api/openapi.yaml b/api/openapi.yaml index 6249f04..2babbf5 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -38,6 +38,9 @@ paths: - name: items-per-page in: query schema: { type: integer } + - name: with-metadata + in: query + schema: { type: boolean } responses: 200: description: 'Array of jobs' diff --git a/api/rest.go b/api/rest.go index eb031e6..ee83b6c 100644 --- a/api/rest.go +++ b/api/rest.go @@ -108,6 +108,7 @@ type TagJobApiRequest []*struct { // Return a list of jobs func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { + withMetadata := false filter := &model.JobFilter{} page := &model.PageRequest{ItemsPerPage: -1, Page: 1} order := &model.OrderByInput{Field: "startTime", Order: model.SortDirectionEnumDesc} @@ -156,6 +157,8 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { return } page.ItemsPerPage = x + case "with-metadata": + withMetadata = true default: http.Error(rw, "invalid query parameter: "+key, http.StatusBadRequest) return @@ -170,6 +173,13 @@ func (api *RestApi) getJobs(rw http.ResponseWriter, r *http.Request) { results := make([]*schema.JobMeta, 0, len(jobs)) for _, job := range jobs { + if withMetadata { + if _, err := api.JobRepository.FetchMetadata(job); err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + } + res := &schema.JobMeta{ ID: &job.ID, BaseJob: job.BaseJob,