mirror of
				https://github.com/ClusterCockpit/cc-backend
				synced 2025-10-31 07:55:06 +01:00 
			
		
		
		
	Merge branch 'dev' into metricstore
This commit is contained in:
		| @@ -337,10 +337,10 @@ func (r *JobRepository) FindColumnValue(user *schema.User, searchterm string, ta | ||||
|  | ||||
| 		// theSql, args, theErr := theQuery.ToSql() | ||||
| 		// if theErr != nil { | ||||
| 		// 	log.Warn("Error while converting query to sql") | ||||
| 		// 	cclog.Warn("Error while converting query to sql") | ||||
| 		// 	return "", err | ||||
| 		// } | ||||
| 		// log.Debugf("SQL query (FindColumnValue): `%s`, args: %#v", theSql, args) | ||||
| 		// cclog.Debugf("SQL query (FindColumnValue): `%s`, args: %#v", theSql, args) | ||||
|  | ||||
| 		err := theQuery.RunWith(r.stmtCache).QueryRow().Scan(&result) | ||||
|  | ||||
|   | ||||
| @@ -4,12 +4,13 @@ CREATE TABLE "node" ( | ||||
|     hostname VARCHAR(255) NOT NULL, | ||||
|     cluster VARCHAR(255) NOT NULL, | ||||
|     subcluster VARCHAR(255) NOT NULL, | ||||
|     cpus_allocated INTEGER NOT NULL, | ||||
|     cpus_total INTEGER NOT NULL, | ||||
|     memory_allocated INTEGER NOT NULL, | ||||
|     memory_total INTEGER NOT NULL, | ||||
|     gpus_allocated INTEGER NOT NULL, | ||||
|     gpus_total INTEGER NOT NULL, | ||||
|     jobs_running INTEGER DEFAULT 0 NOT NULL, | ||||
|     cpus_allocated INTEGER DEFAULT 0 NOT NULL, | ||||
|     cpus_total INTEGER DEFAULT 0 NOT NULL, | ||||
|     memory_allocated INTEGER DEFAULT 0 NOT NULL, | ||||
|     memory_total INTEGER DEFAULT 0 NOT NULL, | ||||
|     gpus_allocated INTEGER DEFAULT 0 NOT NULL, | ||||
|     gpus_total INTEGER DEFAULT 0 NOT NULL, | ||||
|     node_state VARCHAR(255) NOT NULL | ||||
|     CHECK (node_state IN ( | ||||
|         'allocated', 'reserved', 'idle', 'mixed', | ||||
|   | ||||
| @@ -49,6 +49,12 @@ func GetNodeRepository() *NodeRepository { | ||||
| 	return nodeRepoInstance | ||||
| } | ||||
|  | ||||
| var nodeColumns []string = []string{ | ||||
| 	// "node.id," | ||||
| 	"node.hostname", "node.cluster", "node.subcluster", | ||||
| 	"node.node_state", "node.health_state", // "node.meta_data", | ||||
| } | ||||
|  | ||||
| func (r *NodeRepository) FetchMetadata(node *schema.Node) (map[string]string, error) { | ||||
| 	start := time.Now() | ||||
| 	cachekey := fmt.Sprintf("metadata:%d", node.ID) | ||||
| @@ -218,9 +224,9 @@ func (r *NodeRepository) DeleteNode(id int64) error { | ||||
| func (r *NodeRepository) QueryNodes( | ||||
| 	ctx context.Context, | ||||
| 	filters []*model.NodeFilter, | ||||
| 	order *model.OrderByInput, | ||||
| 	order *model.OrderByInput, // Currently unused! | ||||
| ) ([]*schema.Node, error) { | ||||
| 	query, qerr := SecurityCheck(ctx, sq.Select(jobColumns...).From("node")) | ||||
| 	query, qerr := AccessCheck(ctx, sq.Select(nodeColumns...).From("node")) | ||||
| 	if qerr != nil { | ||||
| 		return nil, qerr | ||||
| 	} | ||||
| @@ -232,6 +238,9 @@ func (r *NodeRepository) QueryNodes( | ||||
| 		if f.Cluster != nil { | ||||
| 			query = buildStringCondition("node.cluster", f.Cluster, query) | ||||
| 		} | ||||
| 		if f.Subcluster != nil { | ||||
| 			query = buildStringCondition("node.subcluster", f.Subcluster, query) | ||||
| 		} | ||||
| 		if f.NodeState != nil { | ||||
| 			query = query.Where("node.node_state = ?", f.NodeState) | ||||
| 		} | ||||
| @@ -287,3 +296,123 @@ func (r *NodeRepository) ListNodes(cluster string) ([]*schema.Node, error) { | ||||
|  | ||||
| 	return nodeList, nil | ||||
| } | ||||
|  | ||||
| func (r *NodeRepository) CountNodeStates(ctx context.Context, filters []*model.NodeFilter) ([]*model.NodeStates, error) { | ||||
| 	query, qerr := AccessCheck(ctx, sq.Select("node_state AS state", "count(*) AS count").From("node")) | ||||
| 	if qerr != nil { | ||||
| 		return nil, qerr | ||||
| 	} | ||||
|  | ||||
| 	for _, f := range filters { | ||||
| 		if f.Hostname != nil { | ||||
| 			query = buildStringCondition("node.hostname", f.Hostname, query) | ||||
| 		} | ||||
| 		if f.Cluster != nil { | ||||
| 			query = buildStringCondition("node.cluster", f.Cluster, query) | ||||
| 		} | ||||
| 		if f.Subcluster != nil { | ||||
| 			query = buildStringCondition("node.subcluster", f.Subcluster, query) | ||||
| 		} | ||||
| 		if f.NodeState != nil { | ||||
| 			query = query.Where("node.node_state = ?", f.NodeState) | ||||
| 		} | ||||
| 		if f.HealthState != nil { | ||||
| 			query = query.Where("node.health_state = ?", f.HealthState) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Add Group and Order | ||||
| 	query = query.GroupBy("state").OrderBy("count DESC") | ||||
|  | ||||
| 	rows, err := query.RunWith(r.stmtCache).Query() | ||||
| 	if err != nil { | ||||
| 		queryString, queryVars, _ := query.ToSql() | ||||
| 		cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err) | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	nodes := make([]*model.NodeStates, 0) | ||||
| 	for rows.Next() { | ||||
| 		node := model.NodeStates{} | ||||
|  | ||||
| 		if err := rows.Scan(&node.State, &node.Count); err != nil { | ||||
| 			rows.Close() | ||||
| 			cclog.Warn("Error while scanning rows (NodeStates)") | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		nodes = append(nodes, &node) | ||||
| 	} | ||||
|  | ||||
| 	return nodes, nil | ||||
| } | ||||
|  | ||||
| func (r *NodeRepository) CountHealthStates(ctx context.Context, filters []*model.NodeFilter) ([]*model.NodeStates, error) { | ||||
| 	query, qerr := AccessCheck(ctx, sq.Select("health_state AS state", "count(*) AS count").From("node")) | ||||
| 	if qerr != nil { | ||||
| 		return nil, qerr | ||||
| 	} | ||||
|  | ||||
| 	for _, f := range filters { | ||||
| 		if f.Hostname != nil { | ||||
| 			query = buildStringCondition("node.hostname", f.Hostname, query) | ||||
| 		} | ||||
| 		if f.Cluster != nil { | ||||
| 			query = buildStringCondition("node.cluster", f.Cluster, query) | ||||
| 		} | ||||
| 		if f.Subcluster != nil { | ||||
| 			query = buildStringCondition("node.subcluster", f.Subcluster, query) | ||||
| 		} | ||||
| 		if f.NodeState != nil { | ||||
| 			query = query.Where("node.node_state = ?", f.NodeState) | ||||
| 		} | ||||
| 		if f.HealthState != nil { | ||||
| 			query = query.Where("node.health_state = ?", f.HealthState) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Add Group and Order | ||||
| 	query = query.GroupBy("state").OrderBy("count DESC") | ||||
|  | ||||
| 	rows, err := query.RunWith(r.stmtCache).Query() | ||||
| 	if err != nil { | ||||
| 		queryString, queryVars, _ := query.ToSql() | ||||
| 		cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err) | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	nodes := make([]*model.NodeStates, 0) | ||||
| 	for rows.Next() { | ||||
| 		node := model.NodeStates{} | ||||
|  | ||||
| 		if err := rows.Scan(&node.State, &node.Count); err != nil { | ||||
| 			rows.Close() | ||||
| 			cclog.Warn("Error while scanning rows (NodeStates)") | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		nodes = append(nodes, &node) | ||||
| 	} | ||||
|  | ||||
| 	return nodes, nil | ||||
| } | ||||
|  | ||||
| func AccessCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilder, error) { | ||||
| 	user := GetUserFromContext(ctx) | ||||
| 	return AccessCheckWithUser(user, query) | ||||
| } | ||||
|  | ||||
| func AccessCheckWithUser(user *schema.User, query sq.SelectBuilder) (sq.SelectBuilder, error) { | ||||
| 	if user == nil { | ||||
| 		var qnil sq.SelectBuilder | ||||
| 		return qnil, fmt.Errorf("user context is nil") | ||||
| 	} | ||||
|  | ||||
| 	switch { | ||||
| 	// case len(user.Roles) == 1 && user.HasRole(schema.RoleApi): // API-User : Access NodeInfos | ||||
| 	// 	return query, nil | ||||
| 	case user.HasAnyRole([]schema.Role{schema.RoleAdmin, schema.RoleSupport}): // Admin & Support : Access NodeInfos | ||||
| 		return query, nil | ||||
| 	default: // No known Role: No Access, return error | ||||
| 		var qnil sq.SelectBuilder | ||||
| 		return qnil, fmt.Errorf("user has no or unknown roles") | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -28,6 +28,7 @@ var groupBy2column = map[model.Aggregate]string{ | ||||
|  | ||||
| var sortBy2column = map[model.SortByAggregate]string{ | ||||
| 	model.SortByAggregateTotaljobs:      "totalJobs", | ||||
| 	model.SortByAggregateTotalusers:     "totalUsers", | ||||
| 	model.SortByAggregateTotalwalltime:  "totalWalltime", | ||||
| 	model.SortByAggregateTotalnodes:     "totalNodes", | ||||
| 	model.SortByAggregateTotalnodehours: "totalNodeHours", | ||||
| @@ -76,8 +77,12 @@ func (r *JobRepository) buildStatsQuery( | ||||
| 	// fmt.Sprintf(`CAST(ROUND((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) / 3600) as %s) as value`, time.Now().Unix(), castType) | ||||
|  | ||||
| 	if col != "" { | ||||
| 		// Scan columns: id, totalJobs, name, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours | ||||
| 		query = sq.Select(col, "COUNT(job.id) as totalJobs", "name", | ||||
| 		// Scan columns: id, name, totalJobs, totalUsers, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours | ||||
| 		query = sq.Select( | ||||
| 			col, | ||||
| 			"name", | ||||
| 			"COUNT(job.id) as totalJobs", | ||||
| 			"COUNT(DISTINCT job.hpc_user) AS totalUsers", | ||||
| 			fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as %s) as totalWalltime`, time.Now().Unix(), castType), | ||||
| 			fmt.Sprintf(`CAST(SUM(job.num_nodes) as %s) as totalNodes`, castType), | ||||
| 			fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as %s) as totalNodeHours`, time.Now().Unix(), castType), | ||||
| @@ -87,8 +92,10 @@ func (r *JobRepository) buildStatsQuery( | ||||
| 			fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_acc) / 3600) as %s) as totalAccHours`, time.Now().Unix(), castType), | ||||
| 		).From("job").LeftJoin("hpc_user ON hpc_user.username = job.hpc_user").GroupBy(col) | ||||
| 	} else { | ||||
| 		// Scan columns: totalJobs, name, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours | ||||
| 		query = sq.Select("COUNT(job.id)", | ||||
| 		// Scan columns: totalJobs, totalUsers, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours | ||||
| 		query = sq.Select( | ||||
| 			"COUNT(job.id) as totalJobs", | ||||
| 			"COUNT(DISTINCT job.hpc_user) AS totalUsers", | ||||
| 			fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END)) / 3600) as %s)`, time.Now().Unix(), castType), | ||||
| 			fmt.Sprintf(`CAST(SUM(job.num_nodes) as %s)`, castType), | ||||
| 			fmt.Sprintf(`CAST(ROUND(SUM((CASE WHEN job.job_state = "running" THEN %d - job.start_time ELSE job.duration END) * job.num_nodes) / 3600) as %s)`, time.Now().Unix(), castType), | ||||
| @@ -167,14 +174,14 @@ func (r *JobRepository) JobsStatsGrouped( | ||||
| 	for rows.Next() { | ||||
| 		var id sql.NullString | ||||
| 		var name sql.NullString | ||||
| 		var jobs, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64 | ||||
| 		if err := rows.Scan(&id, &jobs, &name, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil { | ||||
| 		var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64 | ||||
| 		if err := rows.Scan(&id, &name, &jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil { | ||||
| 			cclog.Warn("Error while scanning rows") | ||||
| 			return nil, err | ||||
| 		} | ||||
|  | ||||
| 		if id.Valid { | ||||
| 			var totalJobs, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours int | ||||
| 			var totalJobs, totalUsers, totalWalltime, totalNodes, totalNodeHours, totalCores, totalCoreHours, totalAccs, totalAccHours int | ||||
| 			var personName string | ||||
|  | ||||
| 			if name.Valid { | ||||
| @@ -185,6 +192,10 @@ func (r *JobRepository) JobsStatsGrouped( | ||||
| 				totalJobs = int(jobs.Int64) | ||||
| 			} | ||||
|  | ||||
| 			if users.Valid { | ||||
| 				totalUsers = int(users.Int64) | ||||
| 			} | ||||
|  | ||||
| 			if walltime.Valid { | ||||
| 				totalWalltime = int(walltime.Int64) | ||||
| 			} | ||||
| @@ -228,8 +239,9 @@ func (r *JobRepository) JobsStatsGrouped( | ||||
| 				stats = append(stats, | ||||
| 					&model.JobsStatistics{ | ||||
| 						ID:             id.String, | ||||
| 						TotalJobs:      int(jobs.Int64), | ||||
| 						TotalWalltime:  int(walltime.Int64), | ||||
| 						TotalJobs:      totalJobs, | ||||
| 						TotalUsers:     totalUsers, | ||||
| 						TotalWalltime:  totalWalltime, | ||||
| 						TotalNodes:     totalNodes, | ||||
| 						TotalNodeHours: totalNodeHours, | ||||
| 						TotalCores:     totalCores, | ||||
| @@ -259,8 +271,8 @@ func (r *JobRepository) JobsStats( | ||||
| 	row := query.RunWith(r.DB).QueryRow() | ||||
| 	stats := make([]*model.JobsStatistics, 0, 1) | ||||
|  | ||||
| 	var jobs, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64 | ||||
| 	if err := row.Scan(&jobs, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil { | ||||
| 	var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours sql.NullInt64 | ||||
| 	if err := row.Scan(&jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours); err != nil { | ||||
| 		cclog.Warn("Error while scanning rows") | ||||
| 		return nil, err | ||||
| 	} | ||||
| @@ -280,6 +292,7 @@ func (r *JobRepository) JobsStats( | ||||
| 		stats = append(stats, | ||||
| 			&model.JobsStatistics{ | ||||
| 				TotalJobs:      int(jobs.Int64), | ||||
| 				TotalUsers:     int(users.Int64), | ||||
| 				TotalWalltime:  int(walltime.Int64), | ||||
| 				TotalNodeHours: totalNodeHours, | ||||
| 				TotalCoreHours: totalCoreHours, | ||||
|   | ||||
		Reference in New Issue
	
	Block a user