From 743a89c3a218d9ba75c0272cb8c62836e0f70fa9 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Fri, 4 Jul 2025 15:14:15 +0200 Subject: [PATCH] Finalize node query backend functions, fix migration issue --- api/schema.graphqls | 2 + internal/graph/generated/generated.go | 10 +- internal/graph/model/models_gen.go | 1 + internal/graph/schema.resolvers.go | 19 ++- internal/importer/initDB.go | 2 +- internal/repository/job.go | 4 +- .../migrations/sqlite3/10_node-table.up.sql | 12 +- internal/repository/node.go | 124 +++++++++++++++++- 8 files changed, 162 insertions(+), 12 deletions(-) diff --git a/api/schema.graphqls b/api/schema.graphqls index 794c630..5ff1a36 100644 --- a/api/schema.graphqls +++ b/api/schema.graphqls @@ -300,6 +300,7 @@ type Query { user(username: String!): User allocatedNodes(cluster: String!): [Count!]! + ## Node Queries New node(id: ID!): Node nodes(filter: [NodeFilter!], order: OrderByInput): NodeStateResultList! nodeStats(filter: [NodeFilter!]): [NodeStats!]! @@ -393,6 +394,7 @@ type TimeRangeOutput { input NodeFilter { hostname: StringInput cluster: StringInput + subcluster: StringInput nodeState: NodeState healthState: MonitoringState } diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index 238270f..b150423 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -2714,6 +2714,7 @@ type TimeRangeOutput { input NodeFilter { hostname: StringInput cluster: StringInput + subcluster: StringInput nodeState: NodeState healthState: MonitoringState } @@ -17745,7 +17746,7 @@ func (ec *executionContext) unmarshalInputNodeFilter(ctx context.Context, obj an asMap[k] = v } - fieldsInOrder := [...]string{"hostname", "cluster", "nodeState", "healthState"} + fieldsInOrder := [...]string{"hostname", "cluster", "subcluster", "nodeState", "healthState"} for _, k := range fieldsInOrder { v, ok := asMap[k] if !ok { @@ -17766,6 +17767,13 @@ func (ec *executionContext) unmarshalInputNodeFilter(ctx context.Context, obj an return it, err } it.Cluster = data + case "subcluster": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("subcluster")) + data, err := ec.unmarshalOStringInput2ᚖgithubᚗcomᚋClusterCockpitᚋccᚑbackendᚋinternalᚋgraphᚋmodelᚐStringInput(ctx, v) + if err != nil { + return it, err + } + it.Subcluster = data case "nodeState": ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("nodeState")) data, err := ec.unmarshalONodeState2ᚖstring(ctx, v) diff --git a/internal/graph/model/models_gen.go b/internal/graph/model/models_gen.go index 5a32ac9..c5cc79b 100644 --- a/internal/graph/model/models_gen.go +++ b/internal/graph/model/models_gen.go @@ -170,6 +170,7 @@ type NamedStatsWithScope struct { type NodeFilter struct { Hostname *StringInput `json:"hostname,omitempty"` Cluster *StringInput `json:"cluster,omitempty"` + Subcluster *StringInput `json:"subcluster,omitempty"` NodeState *string `json:"nodeState,omitempty"` HealthState *schema.NodeState `json:"healthState,omitempty"` } diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 78a76ef..1284c09 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -380,7 +380,24 @@ func (r *queryResolver) Nodes(ctx context.Context, filter []*model.NodeFilter, o // NodeStats is the resolver for the nodeStats field. func (r *queryResolver) NodeStats(ctx context.Context, filter []*model.NodeFilter) ([]*model.NodeStats, error) { - panic(fmt.Errorf("not implemented: NodeStats - nodeStats")) + repo := repository.GetNodeRepository() + + stateCounts, serr := repo.CountNodeStates(ctx, filter) + if serr != nil { + cclog.Warnf("Error while counting nodeStates: %s", serr.Error()) + return nil, serr + } + + healthCounts, herr := repo.CountHealthStates(ctx, filter) + if herr != nil { + cclog.Warnf("Error while counting healthStates: %s", herr.Error()) + return nil, herr + } + + allCounts := make([]*model.NodeStats, 0) + allCounts = append(stateCounts, healthCounts...) + + return allCounts, nil } // Job is the resolver for the job field. diff --git a/internal/importer/initDB.go b/internal/importer/initDB.go index 179c21c..98dca03 100644 --- a/internal/importer/initDB.go +++ b/internal/importer/initDB.go @@ -40,7 +40,7 @@ func InitDB() error { } tags := make(map[string]int64) - // Not using log.Print because we want the line to end with `\r` and + // Not using cclog.Print because we want the line to end with `\r` and // this function is only ever called when a special command line flag // is passed anyways. fmt.Printf("%d jobs inserted...\r", 0) diff --git a/internal/repository/job.go b/internal/repository/job.go index b6aa323..2cde824 100644 --- a/internal/repository/job.go +++ b/internal/repository/job.go @@ -337,10 +337,10 @@ func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, ta // theSql, args, theErr := theQuery.ToSql() // if theErr != nil { - // log.Warn("Error while converting query to sql") + // cclog.Warn("Error while converting query to sql") // return "", err // } - // log.Debugf("SQL query (FindColumnValue): `%s`, args: %#v", theSql, args) + // cclog.Debugf("SQL query (FindColumnValue): `%s`, args: %#v", theSql, args) err := theQuery.RunWith(r.stmtCache).QueryRow().Scan(&result) diff --git a/internal/repository/migrations/sqlite3/10_node-table.up.sql b/internal/repository/migrations/sqlite3/10_node-table.up.sql index c208b32..52e6a05 100644 --- a/internal/repository/migrations/sqlite3/10_node-table.up.sql +++ b/internal/repository/migrations/sqlite3/10_node-table.up.sql @@ -3,12 +3,12 @@ CREATE TABLE "node" ( hostname VARCHAR(255) NOT NULL, cluster VARCHAR(255) NOT NULL, subcluster VARCHAR(255) NOT NULL, - cpus_allocated INTEGER NOT NULL, - cpus_total INTEGER NOT NULL, - memory_allocated INTEGER NOT NULL, - memory_total INTEGER NOT NULL, - gpus_allocated INTEGER NOT NULL, - gpus_total INTEGER NOT NULL, + cpus_allocated INTEGER DEFAULT 0 NOT NULL, + cpus_total INTEGER DEFAULT 0 NOT NULL, + memory_allocated INTEGER DEFAULT 0 NOT NULL, + memory_total INTEGER DEFAULT 0 NOT NULL, + gpus_allocated INTEGER DEFAULT 0 NOT NULL, + gpus_total INTEGER DEFAULT 0 NOT NULL, node_state VARCHAR(255) NOT NULL CHECK (node_state IN ( 'allocated', 'reserved', 'idle', 'mixed', diff --git a/internal/repository/node.go b/internal/repository/node.go index 83bf062..b4d0181 100644 --- a/internal/repository/node.go +++ b/internal/repository/node.go @@ -49,6 +49,11 @@ func GetNodeRepository() *NodeRepository { return nodeRepoInstance } +var nodeColumns []string = []string{ + "node.id", "node.hostname", "node.cluster", "node.subcluster", + "node.node_state", "node.health_state", "node.meta_data", +} + func (r *NodeRepository) FetchMetadata(node *schema.Node) (map[string]string, error) { start := time.Now() cachekey := fmt.Sprintf("metadata:%d", node.ID) @@ -220,7 +225,7 @@ func (r *NodeRepository) QueryNodes( filters []*model.NodeFilter, order *model.OrderByInput, ) ([]*schema.Node, error) { - query, qerr := SecurityCheck(ctx, sq.Select(jobColumns...).From("node")) + query, qerr := AccessCheck(ctx, sq.Select(nodeColumns...).From("node")) if qerr != nil { return nil, qerr } @@ -232,6 +237,9 @@ func (r *NodeRepository) QueryNodes( if f.Cluster != nil { query = buildStringCondition("node.cluster", f.Cluster, query) } + if f.Subcluster != nil { + query = buildStringCondition("node.subcluster", f.Subcluster, query) + } if f.NodeState != nil { query = query.Where("node.node_state = ?", f.NodeState) } @@ -287,3 +295,117 @@ func (r *NodeRepository) ListNodes(cluster string) ([]*schema.Node, error) { return nodeList, nil } + +func (r *NodeRepository) CountNodeStates(ctx context.Context, filters []*model.NodeFilter) ([]*model.NodeStats, error) { + query, qerr := AccessCheck(ctx, sq.Select("node_state AS state", "count(*) AS count").From("node")) + if qerr != nil { + return nil, qerr + } + + for _, f := range filters { + if f.Hostname != nil { + query = buildStringCondition("node.hostname", f.Hostname, query) + } + if f.Cluster != nil { + query = buildStringCondition("node.cluster", f.Cluster, query) + } + if f.Subcluster != nil { + query = buildStringCondition("node.subcluster", f.Subcluster, query) + } + if f.NodeState != nil { + query = query.Where("node.node_state = ?", f.NodeState) + } + if f.HealthState != nil { + query = query.Where("node.health_state = ?", f.HealthState) + } + } + + rows, err := query.RunWith(r.stmtCache).Query() + if err != nil { + queryString, queryVars, _ := query.ToSql() + cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err) + return nil, err + } + + nodes := make([]*model.NodeStats, 0) + for rows.Next() { + node := model.NodeStats{} + + if err := rows.Scan(&node.State, &node.Count); err != nil { + rows.Close() + cclog.Warn("Error while scanning rows (NodeStats)") + return nil, err + } + nodes = append(nodes, &node) + } + + return nodes, nil +} + +func (r *NodeRepository) CountHealthStates(ctx context.Context, filters []*model.NodeFilter) ([]*model.NodeStats, error) { + query, qerr := AccessCheck(ctx, sq.Select("health_state AS state", "count(*) AS count").From("node")) + if qerr != nil { + return nil, qerr + } + + for _, f := range filters { + if f.Hostname != nil { + query = buildStringCondition("node.hostname", f.Hostname, query) + } + if f.Cluster != nil { + query = buildStringCondition("node.cluster", f.Cluster, query) + } + if f.Subcluster != nil { + query = buildStringCondition("node.subcluster", f.Subcluster, query) + } + if f.NodeState != nil { + query = query.Where("node.node_state = ?", f.NodeState) + } + if f.HealthState != nil { + query = query.Where("node.health_state = ?", f.HealthState) + } + } + + rows, err := query.RunWith(r.stmtCache).Query() + if err != nil { + queryString, queryVars, _ := query.ToSql() + cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err) + return nil, err + } + + nodes := make([]*model.NodeStats, 0) + for rows.Next() { + node := model.NodeStats{} + + if err := rows.Scan(&node.State, &node.Count); err != nil { + rows.Close() + cclog.Warn("Error while scanning rows (NodeStats)") + return nil, err + } + nodes = append(nodes, &node) + } + + return nodes, nil +} + +func AccessCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilder, error) { + user := GetUserFromContext(ctx) + return AccessCheckWithUser(user, query) +} + +func AccessCheckWithUser(user *schema.User, query sq.SelectBuilder) (sq.SelectBuilder, error) { + if user == nil { + var qnil sq.SelectBuilder + return qnil, fmt.Errorf("user context is nil") + } + + switch { + // case len(user.Roles) == 1 && user.HasRole(schema.RoleApi): // API-User : Access NodeInfos + // return query, nil + case user.HasAnyRole([]schema.Role{schema.RoleAdmin, schema.RoleSupport}): // Admin & Support : Access NodeInfos + return query, nil + default: // No known Role: No Access, return error + var qnil sq.SelectBuilder + return qnil, fmt.Errorf("user has no or unknown roles") + } +}