move extensive NodeMetricsList handling to node repo func

This commit is contained in:
Christoph Kluge
2025-12-18 10:44:58 +01:00
parent 32e5353847
commit 19c8e9beb1
2 changed files with 142 additions and 123 deletions

View File

@@ -806,140 +806,24 @@ func (r *queryResolver) NodeMetricsList(ctx context.Context, cluster string, sub
return nil, errors.New("you need to be administrator or support staff for this query")
}
nodeRepo := repository.GetNodeRepository()
nodes, stateMap, countNodes, hasNextPage, nerr := nodeRepo.GetNodesForList(ctx, cluster, subCluster, stateFilter, nodeFilter, page)
if nerr != nil {
return nil, errors.New("Could not retrieve node list required for resolving NodeMetricsList")
}
if metrics == nil {
for _, mc := range archive.GetCluster(cluster).MetricConfig {
metrics = append(metrics, mc.Name)
}
}
// Build Filters
queryFilters := make([]*model.NodeFilter, 0)
if cluster != "" {
queryFilters = append(queryFilters, &model.NodeFilter{Cluster: &model.StringInput{Eq: &cluster}})
}
if subCluster != "" {
queryFilters = append(queryFilters, &model.NodeFilter{Subcluster: &model.StringInput{Eq: &subCluster}})
}
if nodeFilter != "" && stateFilter != "notindb" {
queryFilters = append(queryFilters, &model.NodeFilter{Hostname: &model.StringInput{Contains: &nodeFilter}})
}
if stateFilter != "all" && stateFilter != "notindb" {
var queryState schema.SchedulerState = schema.SchedulerState(stateFilter)
queryFilters = append(queryFilters, &model.NodeFilter{SchedulerState: &queryState})
}
// if healthFilter != "all" {
// filters = append(filters, &model.NodeFilter{HealthState: &healthFilter})
// }
// Special Case: Disable Paging for missing nodes filter, save IPP for later
var backupItems int
if stateFilter == "notindb" {
backupItems = page.ItemsPerPage
page.ItemsPerPage = -1
}
// Query Nodes From DB
nodeRepo := repository.GetNodeRepository()
rawNodes, serr := nodeRepo.QueryNodes(ctx, queryFilters, page, nil) // Order not Used
if serr != nil {
cclog.Warn("error while loading node database data (Resolver.NodeMetricsList)")
return nil, serr
}
// Intermediate Node Result Info
nodes := make([]string, 0)
stateMap := make(map[string]string)
for _, node := range rawNodes {
nodes = append(nodes, node.Hostname)
stateMap[node.Hostname] = string(node.NodeState)
}
// Setup Vars
var countNodes int
var cerr error
var hasNextPage bool
// Special Case: Find Nodes not in DB node table but in metricStore only
if stateFilter == "notindb" {
// Reapply Original Paging
page.ItemsPerPage = backupItems
// Get Nodes From Topology
var topoNodes []string
if subCluster != "" {
scNodes := archive.NodeLists[cluster][subCluster]
topoNodes = scNodes.PrintList()
} else {
subClusterNodeLists := archive.NodeLists[cluster]
for _, nodeList := range subClusterNodeLists {
topoNodes = append(topoNodes, nodeList.PrintList()...)
}
}
// Compare to all nodes from cluster/subcluster in DB
var missingNodes []string
for _, scanNode := range topoNodes {
if !slices.Contains(nodes, scanNode) {
missingNodes = append(missingNodes, scanNode)
}
}
// Filter nodes by name
if nodeFilter != "" {
filteredNodesByName := []string{}
for _, missingNode := range missingNodes {
if strings.Contains(missingNode, nodeFilter) {
filteredNodesByName = append(filteredNodesByName, missingNode)
}
}
missingNodes = filteredNodesByName
}
// Sort Missing Nodes Alphanumerically
slices.Sort(missingNodes)
// Total Missing
countNodes = len(missingNodes)
// Apply paging
if countNodes > page.ItemsPerPage {
start := (page.Page - 1) * page.ItemsPerPage
end := start + page.ItemsPerPage
if end > countNodes {
end = countNodes
hasNextPage = false
} else {
hasNextPage = true
}
nodes = missingNodes[start:end]
} else {
nodes = missingNodes
}
} else {
// DB Nodes: Count and Find Next Page
countNodes, cerr = nodeRepo.CountNodes(ctx, queryFilters)
if cerr != nil {
cclog.Warn("error while counting node database data (Resolver.NodeMetricsList)")
return nil, cerr
}
// Example Page 4 @ 10 IpP : Does item 41 exist?
// Minimal Page 41 @ 1 IpP : If len(result) is 1, Page 5 exists.
nextPage := &model.PageRequest{
ItemsPerPage: 1,
Page: ((page.Page * page.ItemsPerPage) + 1),
}
nextNodes, err := nodeRepo.QueryNodes(ctx, queryFilters, nextPage, nil) // Order not Used
if err != nil {
cclog.Warn("Error while querying next nodes")
return nil, err
}
hasNextPage = len(nextNodes) == 1
}
// Load Metric Data For Specified Nodes Only
data, err := metricDataDispatcher.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, *resolution, from, to, ctx)
if err != nil {
cclog.Warn("error while loading node data (Resolver.NodeMetricsList")
return nil, err
}
// Build Result
nodeMetricsList := make([]*model.NodeMetrics, 0, len(data))
for hostname, metrics := range data {
host := &model.NodeMetrics{
@@ -965,7 +849,6 @@ func (r *queryResolver) NodeMetricsList(ctx context.Context, cluster string, sub
nodeMetricsList = append(nodeMetricsList, host)
}
// Final Return
nodeMetricsListResult := &model.NodesResultList{
Items: nodeMetricsList,
TotalNodes: &countNodes,

View File

@@ -10,6 +10,8 @@ import (
"database/sql"
"encoding/json"
"fmt"
"slices"
"strings"
"sync"
"time"
@@ -551,6 +553,140 @@ func (r *NodeRepository) CountStatesTimed(ctx context.Context, filters []*model.
return timedStates, nil
}
func (r *NodeRepository) GetNodesForList(
ctx context.Context,
cluster string,
subCluster string,
stateFilter string,
nodeFilter string,
page *model.PageRequest,
) ([]string, map[string]string, int, bool, error) {
// Init Return Vars
nodes := make([]string, 0)
stateMap := make(map[string]string)
countNodes := 0
hasNextPage := false
// Build Filters
queryFilters := make([]*model.NodeFilter, 0)
if cluster != "" {
queryFilters = append(queryFilters, &model.NodeFilter{Cluster: &model.StringInput{Eq: &cluster}})
}
if subCluster != "" {
queryFilters = append(queryFilters, &model.NodeFilter{Subcluster: &model.StringInput{Eq: &subCluster}})
}
if nodeFilter != "" && stateFilter != "notindb" {
queryFilters = append(queryFilters, &model.NodeFilter{Hostname: &model.StringInput{Contains: &nodeFilter}})
}
if stateFilter != "all" && stateFilter != "notindb" {
var queryState schema.SchedulerState = schema.SchedulerState(stateFilter)
queryFilters = append(queryFilters, &model.NodeFilter{SchedulerState: &queryState})
}
// if healthFilter != "all" {
// filters = append(filters, &model.NodeFilter{HealthState: &healthFilter})
// }
// Special Case: Disable Paging for missing nodes filter, save IPP for later
var backupItems int
if stateFilter == "notindb" {
backupItems = page.ItemsPerPage
page.ItemsPerPage = -1
}
// Query Nodes From DB
rawNodes, serr := r.QueryNodes(ctx, queryFilters, page, nil) // Order not Used
if serr != nil {
cclog.Warn("error while loading node database data (Resolver.NodeMetricsList)")
return nil, nil, 0, false, serr
}
// Intermediate Node Result Info
for _, node := range rawNodes {
if node == nil {
continue
}
nodes = append(nodes, node.Hostname)
stateMap[node.Hostname] = string(node.NodeState)
}
// Special Case: Find Nodes not in DB node table but in metricStore only
if stateFilter == "notindb" {
// Reapply Original Paging
page.ItemsPerPage = backupItems
// Get Nodes From Topology
var topoNodes []string
if subCluster != "" {
scNodes := archive.NodeLists[cluster][subCluster]
topoNodes = scNodes.PrintList()
} else {
subClusterNodeLists := archive.NodeLists[cluster]
for _, nodeList := range subClusterNodeLists {
topoNodes = append(topoNodes, nodeList.PrintList()...)
}
}
// Compare to all nodes from cluster/subcluster in DB
var missingNodes []string
for _, scanNode := range topoNodes {
if !slices.Contains(nodes, scanNode) {
missingNodes = append(missingNodes, scanNode)
}
}
// Filter nodes by name
if nodeFilter != "" {
filteredNodesByName := []string{}
for _, missingNode := range missingNodes {
if strings.Contains(missingNode, nodeFilter) {
filteredNodesByName = append(filteredNodesByName, missingNode)
}
}
missingNodes = filteredNodesByName
}
// Sort Missing Nodes Alphanumerically
slices.Sort(missingNodes)
// Total Missing
countNodes = len(missingNodes)
// Apply paging
if countNodes > page.ItemsPerPage {
start := (page.Page - 1) * page.ItemsPerPage
end := start + page.ItemsPerPage
if end > countNodes {
end = countNodes
hasNextPage = false
} else {
hasNextPage = true
}
nodes = missingNodes[start:end]
} else {
nodes = missingNodes
}
} else {
// DB Nodes: Count and Find Next Page
var cerr error
countNodes, cerr = r.CountNodes(ctx, queryFilters)
if cerr != nil {
cclog.Warn("error while counting node database data (Resolver.NodeMetricsList)")
return nil, nil, 0, false, cerr
}
// Example Page 4 @ 10 IpP : Does item 41 exist?
// Minimal Page 41 @ 1 IpP : If len(result) is 1, Page 5 exists.
nextPage := &model.PageRequest{
ItemsPerPage: 1,
Page: ((page.Page * page.ItemsPerPage) + 1),
}
nextNodes, err := r.QueryNodes(ctx, queryFilters, nextPage, nil) // Order not Used
if err != nil {
cclog.Warn("Error while querying next nodes")
return nil, nil, 0, false, err
}
hasNextPage = len(nextNodes) == 1
}
return nodes, stateMap, countNodes, hasNextPage, nil
}
func AccessCheck(ctx context.Context, query sq.SelectBuilder) (sq.SelectBuilder, error) {
user := GetUserFromContext(ctx)
return AccessCheckWithUser(user, query)