switch nodeList logic to SQLite as source of truth, fix nodeList continuous scroll

- keep notindb logic for now
This commit is contained in:
Christoph Kluge
2025-11-20 12:18:13 +01:00
parent 90c3381954
commit 399af8592c
10 changed files with 254 additions and 304 deletions

View File

@@ -382,7 +382,7 @@ func (r *queryResolver) Node(ctx context.Context, id string) (*schema.Node, erro
// Nodes is the resolver for the nodes field. // Nodes is the resolver for the nodes field.
func (r *queryResolver) Nodes(ctx context.Context, filter []*model.NodeFilter, order *model.OrderByInput) (*model.NodeStateResultList, error) { func (r *queryResolver) Nodes(ctx context.Context, filter []*model.NodeFilter, order *model.OrderByInput) (*model.NodeStateResultList, error) {
repo := repository.GetNodeRepository() repo := repository.GetNodeRepository()
nodes, err := repo.QueryNodes(ctx, filter, order) nodes, err := repo.QueryNodes(ctx, filter, nil, order) // Ignore Paging, Order Unused
count := len(nodes) count := len(nodes)
return &model.NodeStateResultList{Items: nodes, Count: &count}, err return &model.NodeStateResultList{Items: nodes, Count: &count}, err
} }
@@ -810,50 +810,134 @@ func (r *queryResolver) NodeMetricsList(ctx context.Context, cluster string, sub
} }
} }
// Note: This Prefilter Logic Can Be Used To Completely Switch Node Source Of Truth To SQLite DB // Build Filters
// Adapt and extend filters/paging/sorting in QueryNodes Function to return []string array of hostnames, input array to LoadNodeListData queryFilters := make([]*model.NodeFilter, 0)
// LoadNodeListData, instead of building queried nodes from topoplogy anew, directly will use QueryNodes hostname array if cluster != "" {
// Caveat: "notindb" state will not be resolvable anymore by default, or needs reverse lookup by dedicated comparison to topology data after all queryFilters = append(queryFilters, &model.NodeFilter{Cluster: &model.StringInput{Eq: &cluster}})
preFiltered := make([]string, 0) }
stateMap := make(map[string]string)
if stateFilter != "all" {
nodeRepo := repository.GetNodeRepository()
stateQuery := make([]*model.NodeFilter, 0)
// Required Filters
stateQuery = append(stateQuery, &model.NodeFilter{Cluster: &model.StringInput{Eq: &cluster}})
if subCluster != "" { if subCluster != "" {
stateQuery = append(stateQuery, &model.NodeFilter{Subcluster: &model.StringInput{Eq: &subCluster}}) queryFilters = append(queryFilters, &model.NodeFilter{Subcluster: &model.StringInput{Eq: &subCluster}})
} }
if nodeFilter != "" && stateFilter != "notindb" {
if stateFilter == "notindb" { queryFilters = append(queryFilters, &model.NodeFilter{Hostname: &model.StringInput{Contains: &nodeFilter}})
// Backward Filtering: Add Keyword, No Additional FIlters: Returns All Nodes For Cluster (and SubCluster) }
preFiltered = append(preFiltered, "exclude") if stateFilter != "all" && stateFilter != "notindb" {
} else {
// Workaround: If no nodes match, we need at least one element for trigger in LoadNodeListData
preFiltered = append(preFiltered, stateFilter)
// Forward Filtering: Match Only selected stateFilter
var queryState schema.SchedulerState = schema.SchedulerState(stateFilter) var queryState schema.SchedulerState = schema.SchedulerState(stateFilter)
stateQuery = append(stateQuery, &model.NodeFilter{SchedulerState: &queryState}) queryFilters = append(queryFilters, &model.NodeFilter{SchedulerState: &queryState})
}
// if healthFilter != "all" {
// filters = append(filters, &model.NodeFilter{HealthState: &healthFilter})
// }
// Special Case: Disable Paging for missing nodes filter, save IPP for later
var backupItems int
if stateFilter == "notindb" {
backupItems = page.ItemsPerPage
page.ItemsPerPage = -1
} }
stateNodes, serr := nodeRepo.QueryNodes(ctx, stateQuery, &model.OrderByInput{}) // Order not Used // Query Nodes From DB
nodeRepo := repository.GetNodeRepository()
rawNodes, serr := nodeRepo.QueryNodes(ctx, queryFilters, page, nil) // Order not Used
if serr != nil { if serr != nil {
cclog.Warn("error while loading node database data (Resolver.NodeMetricsList)") cclog.Warn("error while loading node database data (Resolver.NodeMetricsList)")
return nil, serr return nil, serr
} }
for _, node := range stateNodes { // Intermediate Node Result Info
preFiltered = append(preFiltered, node.Hostname) nodes := make([]string, 0)
stateMap := make(map[string]string)
for _, node := range rawNodes {
nodes = append(nodes, node.Hostname)
stateMap[node.Hostname] = string(node.NodeState) stateMap[node.Hostname] = string(node.NodeState)
} }
// Setup Vars
var countNodes int
var cerr error
var hasNextPage bool
// Special Case: Find Nodes not in DB node table but in metricStore only
if stateFilter == "notindb" {
// Reapply Original Paging
page.ItemsPerPage = backupItems
// Get Nodes From Topology
var topoNodes []string
if subCluster != "" {
scNodes := archive.NodeLists[cluster][subCluster]
topoNodes = scNodes.PrintList()
} else {
subClusterNodeLists := archive.NodeLists[cluster]
for _, nodeList := range subClusterNodeLists {
topoNodes = append(topoNodes, nodeList.PrintList()...)
}
}
// Compare to all nodes from cluster/subcluster in DB
var missingNodes []string
for _, scanNode := range topoNodes {
if !slices.Contains(nodes, scanNode) {
missingNodes = append(missingNodes, scanNode)
}
}
// Filter nodes by name
if nodeFilter != "" {
filteredNodesByName := []string{}
for _, missingNode := range missingNodes {
if strings.Contains(missingNode, nodeFilter) {
filteredNodesByName = append(filteredNodesByName, missingNode)
}
}
missingNodes = filteredNodesByName
}
// Sort Missing Nodes Alphanumerically
slices.Sort(missingNodes)
// Total Missing
countNodes = len(missingNodes)
// Apply paging
if countNodes > page.ItemsPerPage {
start := (page.Page - 1) * page.ItemsPerPage
end := start + page.ItemsPerPage
if end > countNodes {
end = countNodes
hasNextPage = false
} else {
hasNextPage = true
}
nodes = missingNodes[start:end]
} else {
nodes = missingNodes
} }
data, totalNodes, hasNextPage, err := metricDataDispatcher.LoadNodeListData(cluster, subCluster, nodeFilter, preFiltered, metrics, scopes, *resolution, from, to, page, ctx) } else {
// DB Nodes: Count and Find Next Page
countNodes, cerr = nodeRepo.CountNodes(ctx, queryFilters)
if cerr != nil {
cclog.Warn("error while counting node database data (Resolver.NodeMetricsList)")
return nil, cerr
}
// Example Page 4 @ 10 IpP : Does item 41 exist?
// Minimal Page 41 @ 1 IpP : If len(result) is 1, Page 5 exists.
nextPage := &model.PageRequest{
ItemsPerPage: 1,
Page: ((page.Page * page.ItemsPerPage) + 1),
}
nextNodes, err := nodeRepo.QueryNodes(ctx, queryFilters, nextPage, nil) // Order not Used
if err != nil {
cclog.Warn("Error while querying next nodes")
return nil, err
}
hasNextPage = len(nextNodes) == 1
}
// Load Metric Data For Specified Nodes Only
data, err := metricDataDispatcher.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, *resolution, from, to, ctx)
if err != nil { if err != nil {
cclog.Warn("error while loading node data (Resolver.NodeMetricsList") cclog.Warn("error while loading node data (Resolver.NodeMetricsList")
return nil, err return nil, err
} }
// Build Result
nodeMetricsList := make([]*model.NodeMetrics, 0, len(data)) nodeMetricsList := make([]*model.NodeMetrics, 0, len(data))
for hostname, metrics := range data { for hostname, metrics := range data {
host := &model.NodeMetrics{ host := &model.NodeMetrics{
@@ -879,9 +963,10 @@ func (r *queryResolver) NodeMetricsList(ctx context.Context, cluster string, sub
nodeMetricsList = append(nodeMetricsList, host) nodeMetricsList = append(nodeMetricsList, host)
} }
// Final Return
nodeMetricsListResult := &model.NodesResultList{ nodeMetricsListResult := &model.NodesResultList{
Items: nodeMetricsList, Items: nodeMetricsList,
TotalNodes: &totalNodes, TotalNodes: &countNodes,
HasNextPage: &hasNextPage, HasNextPage: &hasNextPage,
} }

View File

@@ -11,7 +11,6 @@ import (
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/metricdata"
"github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
@@ -332,18 +331,17 @@ func LoadNodeData(
} }
func LoadNodeListData( func LoadNodeListData(
cluster, subCluster, nodeFilter string, cluster, subCluster string,
preFiltered []string, nodes []string,
metrics []string, metrics []string,
scopes []schema.MetricScope, scopes []schema.MetricScope,
resolution int, resolution int,
from, to time.Time, from, to time.Time,
page *model.PageRequest,
ctx context.Context, ctx context.Context,
) (map[string]schema.JobData, int, bool, error) { ) (map[string]schema.JobData, error) {
repo, err := metricdata.GetMetricDataRepo(cluster) repo, err := metricdata.GetMetricDataRepo(cluster)
if err != nil { if err != nil {
return nil, 0, false, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster) return nil, fmt.Errorf("METRICDATA/METRICDATA > no metric data repository configured for '%s'", cluster)
} }
if metrics == nil { if metrics == nil {
@@ -352,13 +350,13 @@ func LoadNodeListData(
} }
} }
data, totalNodes, hasNextPage, err := repo.LoadNodeListData(cluster, subCluster, nodeFilter, preFiltered, metrics, scopes, resolution, from, to, page, ctx) data, err := repo.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, resolution, from, to, ctx)
if err != nil { if err != nil {
if len(data) != 0 { if len(data) != 0 {
cclog.Warnf("partial error: %s", err.Error()) cclog.Warnf("partial error: %s", err.Error())
} else { } else {
cclog.Error("Error while loading node data from metric repository") cclog.Error("Error while loading node data from metric repository")
return nil, totalNodes, hasNextPage, err return nil, err
} }
} }
@@ -376,8 +374,8 @@ func LoadNodeListData(
} }
if data == nil { if data == nil {
return nil, totalNodes, hasNextPage, fmt.Errorf("METRICDATA/METRICDATA > the metric data repository for '%s' does not support this query", cluster) return nil, fmt.Errorf("METRICDATA/METRICDATA > the metric data repository for '%s' does not support this query", cluster)
} }
return data, totalNodes, hasNextPage, nil return data, nil
} }

View File

@@ -9,13 +9,10 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"slices"
"sort"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/memorystore" "github.com/ClusterCockpit/cc-backend/internal/memorystore"
"github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
@@ -678,84 +675,20 @@ func (ccms *CCMetricStoreInternal) LoadNodeData(
// Used for Systems-View Node-List // Used for Systems-View Node-List
func (ccms *CCMetricStoreInternal) LoadNodeListData( func (ccms *CCMetricStoreInternal) LoadNodeListData(
cluster, subCluster, nodeFilter string, cluster, subCluster string,
preFiltered []string, nodes []string,
metrics []string, metrics []string,
scopes []schema.MetricScope, scopes []schema.MetricScope,
resolution int, resolution int,
from, to time.Time, from, to time.Time,
page *model.PageRequest,
ctx context.Context, ctx context.Context,
) (map[string]schema.JobData, int, bool, error) { ) (map[string]schema.JobData, error) {
// 0) Init additional vars
var totalNodes int = 0
var hasNextPage bool = false
// 1) Get list of all nodes
var nodes []string
if subCluster != "" {
scNodes := archive.NodeLists[cluster][subCluster]
nodes = scNodes.PrintList()
} else {
subClusterNodeLists := archive.NodeLists[cluster]
for _, nodeList := range subClusterNodeLists {
nodes = append(nodes, nodeList.PrintList()...)
}
}
// 2.1) Filter nodes by name
if nodeFilter != "" {
filteredNodesByName := []string{}
for _, node := range nodes {
if strings.Contains(node, nodeFilter) {
filteredNodesByName = append(filteredNodesByName, node)
}
}
nodes = filteredNodesByName
}
// 2.2) Filter nodes by state using prefiltered match array
if len(preFiltered) > 0 {
filteredNodesByState := []string{}
if preFiltered[0] == "exclude" { // Backwards: PreFiltered contains all Nodes in DB > Return Missing Nodes
for _, node := range nodes {
if !slices.Contains(preFiltered, node) {
filteredNodesByState = append(filteredNodesByState, node)
}
}
} else { // Forwards: Prefiltered contains specific nodeState > Return Matches
for _, node := range nodes {
if slices.Contains(preFiltered, node) {
filteredNodesByState = append(filteredNodesByState, node)
}
}
}
nodes = filteredNodesByState
}
// 2.3) Count total nodes && Sort nodes -> Sorting invalidated after return ...
totalNodes = len(nodes)
sort.Strings(nodes)
// 3) Apply paging
if len(nodes) > page.ItemsPerPage {
start := (page.Page - 1) * page.ItemsPerPage
end := start + page.ItemsPerPage
if end >= len(nodes) {
end = len(nodes)
hasNextPage = false
} else {
hasNextPage = true
}
nodes = nodes[start:end]
}
// Note: Order of node data is not guaranteed after this point, but contents match page and filter criteria
// Note: Order of node data is not guaranteed after this point
queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, int64(resolution)) queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, int64(resolution))
if err != nil { if err != nil {
cclog.Errorf("Error while building node queries for Cluster %s, SubCLuster %s, Metrics %v, Scopes %v: %s", cluster, subCluster, metrics, scopes, err.Error()) cclog.Errorf("Error while building node queries for Cluster %s, SubCLuster %s, Metrics %v, Scopes %v: %s", cluster, subCluster, metrics, scopes, err.Error())
return nil, totalNodes, hasNextPage, err return nil, err
} }
req := memorystore.APIQueryRequest{ req := memorystore.APIQueryRequest{
@@ -770,7 +703,7 @@ func (ccms *CCMetricStoreInternal) LoadNodeListData(
resBody, err := memorystore.FetchData(req) resBody, err := memorystore.FetchData(req)
if err != nil { if err != nil {
cclog.Errorf("Error while fetching data : %s", err.Error()) cclog.Errorf("Error while fetching data : %s", err.Error())
return nil, totalNodes, hasNextPage, err return nil, err
} }
var errors []string var errors []string
@@ -850,10 +783,10 @@ func (ccms *CCMetricStoreInternal) LoadNodeListData(
if len(errors) != 0 { if len(errors) != 0 {
/* Returns list of "partial errors" */ /* Returns list of "partial errors" */
return data, totalNodes, hasNextPage, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) return data, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
} }
return data, totalNodes, hasNextPage, nil return data, nil
} }
func (ccms *CCMetricStoreInternal) buildNodeQueries( func (ccms *CCMetricStoreInternal) buildNodeQueries(

View File

@@ -11,12 +11,9 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"slices"
"sort"
"strings" "strings"
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema" "github.com/ClusterCockpit/cc-lib/schema"
@@ -800,85 +797,20 @@ func (ccms *CCMetricStore) LoadNodeData(
// Used for Systems-View Node-List // Used for Systems-View Node-List
func (ccms *CCMetricStore) LoadNodeListData( func (ccms *CCMetricStore) LoadNodeListData(
cluster, subCluster, nodeFilter string, cluster, subCluster string,
preFiltered []string, nodes []string,
metrics []string, metrics []string,
scopes []schema.MetricScope, scopes []schema.MetricScope,
resolution int, resolution int,
from, to time.Time, from, to time.Time,
page *model.PageRequest,
ctx context.Context, ctx context.Context,
) (map[string]schema.JobData, int, bool, error) { ) (map[string]schema.JobData, error) {
// 0) Init additional vars
var totalNodes int = 0
var hasNextPage bool = false
// 1) Get list of all nodes
var nodes []string
if subCluster != "" {
scNodes := archive.NodeLists[cluster][subCluster]
nodes = scNodes.PrintList()
} else {
subClusterNodeLists := archive.NodeLists[cluster]
for _, nodeList := range subClusterNodeLists {
nodes = append(nodes, nodeList.PrintList()...)
}
}
// 2.1) Filter nodes by name
if nodeFilter != "" {
filteredNodesByName := []string{}
for _, node := range nodes {
if strings.Contains(node, nodeFilter) {
filteredNodesByName = append(filteredNodesByName, node)
}
}
nodes = filteredNodesByName
}
// 2.2) Filter nodes by state using prefiltered match array
if len(preFiltered) > 0 {
filteredNodesByState := []string{}
if preFiltered[0] == "exclude" { // Backwards: PreFiltered contains all Nodes in DB > Return Missing Nodes
for _, node := range nodes {
if !slices.Contains(preFiltered, node) {
filteredNodesByState = append(filteredNodesByState, node)
}
}
} else { // Forwards: Prefiltered contains specific nodeState > Return Matches
for _, node := range nodes {
if slices.Contains(preFiltered, node) {
filteredNodesByState = append(filteredNodesByState, node)
}
}
}
nodes = filteredNodesByState
}
// 2.3) Count total nodes && Sort nodes -> Sorting invalidated after return ...
totalNodes = len(nodes)
sort.Strings(nodes)
// 3) Apply paging
if len(nodes) > page.ItemsPerPage {
start := (page.Page - 1) * page.ItemsPerPage
end := start + page.ItemsPerPage
if end > len(nodes) {
end = len(nodes)
hasNextPage = false
} else {
hasNextPage = true
}
nodes = nodes[start:end]
}
// Note: Order of node data is not guaranteed after this point, but contents match page and filter criteria
// Note: Order of node data is not guaranteed after this point
queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, resolution) queries, assignedScope, err := ccms.buildNodeQueries(cluster, subCluster, nodes, metrics, scopes, resolution)
if err != nil { if err != nil {
cclog.Errorf("Error while building node queries for Cluster %s, SubCLuster %s, Metrics %v, Scopes %v: %s", cluster, subCluster, metrics, scopes, err.Error()) cclog.Errorf("Error while building node queries for Cluster %s, SubCLuster %s, Metrics %v, Scopes %v: %s", cluster, subCluster, metrics, scopes, err.Error())
return nil, totalNodes, hasNextPage, err return nil, err
} }
req := ApiQueryRequest{ req := ApiQueryRequest{
@@ -893,7 +825,7 @@ func (ccms *CCMetricStore) LoadNodeListData(
resBody, err := ccms.doRequest(ctx, &req) resBody, err := ccms.doRequest(ctx, &req)
if err != nil { if err != nil {
cclog.Errorf("Error while performing request: %s", err.Error()) cclog.Errorf("Error while performing request: %s", err.Error())
return nil, totalNodes, hasNextPage, err return nil, err
} }
var errors []string var errors []string
@@ -973,10 +905,10 @@ func (ccms *CCMetricStore) LoadNodeListData(
if len(errors) != 0 { if len(errors) != 0 {
/* Returns list of "partial errors" */ /* Returns list of "partial errors" */
return data, totalNodes, hasNextPage, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", ")) return data, fmt.Errorf("METRICDATA/CCMS > Errors: %s", strings.Join(errors, ", "))
} }
return data, totalNodes, hasNextPage, nil return data, nil
} }
func (ccms *CCMetricStore) buildNodeQueries( func (ccms *CCMetricStore) buildNodeQueries(

View File

@@ -12,7 +12,6 @@ import (
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/config"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/internal/memorystore" "github.com/ClusterCockpit/cc-backend/internal/memorystore"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema" "github.com/ClusterCockpit/cc-lib/schema"
@@ -36,7 +35,7 @@ type MetricDataRepository interface {
LoadNodeData(cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error) LoadNodeData(cluster string, metrics, nodes []string, scopes []schema.MetricScope, from, to time.Time, ctx context.Context) (map[string]map[string][]*schema.JobMetric, error)
// Return a map of hosts to a map of metrics to a map of scopes for multiple nodes. // Return a map of hosts to a map of metrics to a map of scopes for multiple nodes.
LoadNodeListData(cluster, subCluster, nodeFilter string, preFiltered []string, metrics []string, scopes []schema.MetricScope, resolution int, from, to time.Time, page *model.PageRequest, ctx context.Context) (map[string]schema.JobData, int, bool, error) LoadNodeListData(cluster, subCluster string, nodes, metrics []string, scopes []schema.MetricScope, resolution int, from, to time.Time, ctx context.Context) (map[string]schema.JobData, error)
} }
var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{} var metricDataRepos map[string]MetricDataRepository = map[string]MetricDataRepository{}

View File

@@ -14,14 +14,12 @@ import (
"net/http" "net/http"
"os" "os"
"regexp" "regexp"
"slices"
"sort" "sort"
"strings" "strings"
"sync" "sync"
"text/template" "text/template"
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/archive"
cclog "github.com/ClusterCockpit/cc-lib/ccLogger" cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
"github.com/ClusterCockpit/cc-lib/schema" "github.com/ClusterCockpit/cc-lib/schema"
@@ -495,82 +493,17 @@ func (pdb *PrometheusDataRepository) LoadScopedStats(
// Implemented by NHR@FAU; Used in NodeList-View // Implemented by NHR@FAU; Used in NodeList-View
func (pdb *PrometheusDataRepository) LoadNodeListData( func (pdb *PrometheusDataRepository) LoadNodeListData(
cluster, subCluster, nodeFilter string, cluster, subCluster string,
preFiltered []string, nodes []string,
metrics []string, metrics []string,
scopes []schema.MetricScope, scopes []schema.MetricScope,
resolution int, resolution int,
from, to time.Time, from, to time.Time,
page *model.PageRequest,
ctx context.Context, ctx context.Context,
) (map[string]schema.JobData, int, bool, error) { ) (map[string]schema.JobData, error) {
// Assumption: pdb.loadData() only returns series node-scope - use node scope for NodeList // Assumption: pdb.loadData() only returns series node-scope - use node scope for NodeList
// 0) Init additional vars // Fetch Data, based on pdb.LoadNodeData()
var totalNodes int = 0
var hasNextPage bool = false
// 1) Get list of all nodes
var nodes []string
if subCluster != "" {
scNodes := archive.NodeLists[cluster][subCluster]
nodes = scNodes.PrintList()
} else {
subClusterNodeLists := archive.NodeLists[cluster]
for _, nodeList := range subClusterNodeLists {
nodes = append(nodes, nodeList.PrintList()...)
}
}
// 2.1) Filter nodes by name
if nodeFilter != "" {
filteredNodesByName := []string{}
for _, node := range nodes {
if strings.Contains(node, nodeFilter) {
filteredNodesByName = append(filteredNodesByName, node)
}
}
nodes = filteredNodesByName
}
// 2.2) Filter nodes by state using prefiltered match array
if len(preFiltered) > 0 {
filteredNodesByState := []string{}
if preFiltered[0] == "exclude" { // Backwards: PreFiltered contains all Nodes in DB > Return Missing Nodes
for _, node := range nodes {
if !slices.Contains(preFiltered, node) {
filteredNodesByState = append(filteredNodesByState, node)
}
}
} else { // Forwards: Prefiltered contains specific nodeState > Return Matches
for _, node := range nodes {
if slices.Contains(preFiltered, node) {
filteredNodesByState = append(filteredNodesByState, node)
}
}
}
nodes = filteredNodesByState
}
// 2.3) Count total nodes && Sort nodes -> Sorting invalidated after return ...
totalNodes = len(nodes)
sort.Strings(nodes)
// 3) Apply paging
if len(nodes) > page.ItemsPerPage {
start := (page.Page - 1) * page.ItemsPerPage
end := start + page.ItemsPerPage
if end >= len(nodes) {
end = len(nodes)
hasNextPage = false
} else {
hasNextPage = true
}
nodes = nodes[start:end]
}
// 4) Fetch Data, based on pdb.LoadNodeData()
t0 := time.Now() t0 := time.Now()
// Map of hosts of jobData // Map of hosts of jobData
data := make(map[string]schema.JobData) data := make(map[string]schema.JobData)
@@ -593,12 +526,12 @@ func (pdb *PrometheusDataRepository) LoadNodeListData(
metricConfig := archive.GetMetricConfig(cluster, metric) metricConfig := archive.GetMetricConfig(cluster, metric)
if metricConfig == nil { if metricConfig == nil {
cclog.Warnf("Error in LoadNodeListData: Metric %s for cluster %s not configured", metric, cluster) cclog.Warnf("Error in LoadNodeListData: Metric %s for cluster %s not configured", metric, cluster)
return nil, totalNodes, hasNextPage, errors.New("Prometheus config error") return nil, errors.New("Prometheus config error")
} }
query, err := pdb.FormatQuery(metric, scope, nodes, cluster) query, err := pdb.FormatQuery(metric, scope, nodes, cluster)
if err != nil { if err != nil {
cclog.Warn("Error while formatting prometheus query") cclog.Warn("Error while formatting prometheus query")
return nil, totalNodes, hasNextPage, err return nil, err
} }
// ranged query over all nodes // ranged query over all nodes
@@ -610,7 +543,7 @@ func (pdb *PrometheusDataRepository) LoadNodeListData(
result, warnings, err := pdb.queryClient.QueryRange(ctx, query, r) result, warnings, err := pdb.queryClient.QueryRange(ctx, query, r)
if err != nil { if err != nil {
cclog.Errorf("Prometheus query error in LoadNodeData: %v\n", err) cclog.Errorf("Prometheus query error in LoadNodeData: %v\n", err)
return nil, totalNodes, hasNextPage, errors.New("Prometheus query error") return nil, errors.New("Prometheus query error")
} }
if len(warnings) > 0 { if len(warnings) > 0 {
cclog.Warnf("Warnings: %v\n", warnings) cclog.Warnf("Warnings: %v\n", warnings)
@@ -650,5 +583,5 @@ func (pdb *PrometheusDataRepository) LoadNodeListData(
} }
t1 := time.Since(t0) t1 := time.Since(t0)
cclog.Debugf("LoadNodeListData of %v nodes took %s", len(data), t1) cclog.Debugf("LoadNodeListData of %v nodes took %s", len(data), t1)
return data, totalNodes, hasNextPage, nil return data, nil
} }

View File

@@ -10,7 +10,6 @@ import (
"encoding/json" "encoding/json"
"time" "time"
"github.com/ClusterCockpit/cc-backend/internal/graph/model"
"github.com/ClusterCockpit/cc-lib/schema" "github.com/ClusterCockpit/cc-lib/schema"
) )
@@ -63,15 +62,14 @@ func (tmdr *TestMetricDataRepository) LoadNodeData(
} }
func (tmdr *TestMetricDataRepository) LoadNodeListData( func (tmdr *TestMetricDataRepository) LoadNodeListData(
cluster, subCluster, nodeFilter string, cluster, subCluster string,
preFiltered []string, nodes []string,
metrics []string, metrics []string,
scopes []schema.MetricScope, scopes []schema.MetricScope,
resolution int, resolution int,
from, to time.Time, from, to time.Time,
page *model.PageRequest,
ctx context.Context, ctx context.Context,
) (map[string]schema.JobData, int, bool, error) { ) (map[string]schema.JobData, error) {
panic("TODO") panic("TODO")
} }

View File

@@ -274,11 +274,12 @@ func (r *NodeRepository) DeleteNode(id int64) error {
func (r *NodeRepository) QueryNodes( func (r *NodeRepository) QueryNodes(
ctx context.Context, ctx context.Context,
filters []*model.NodeFilter, filters []*model.NodeFilter,
page *model.PageRequest,
order *model.OrderByInput, // Currently unused! order *model.OrderByInput, // Currently unused!
) ([]*schema.Node, error) { ) ([]*schema.Node, error) {
query, qerr := AccessCheck(ctx, query, qerr := AccessCheck(ctx,
sq.Select("hostname", "cluster", "subcluster", "node_state", sq.Select("hostname", "cluster", "subcluster", "node_state", "health_state", "MAX(time_stamp) as time").
"health_state", "MAX(time_stamp) as time").
From("node"). From("node").
Join("node_state ON node_state.node_id = node.id")) Join("node_state ON node_state.node_id = node.id"))
if qerr != nil { if qerr != nil {
@@ -286,19 +287,19 @@ func (r *NodeRepository) QueryNodes(
} }
for _, f := range filters { for _, f := range filters {
if f.Hostname != nil {
query = buildStringCondition("hostname", f.Hostname, query)
}
if f.Cluster != nil { if f.Cluster != nil {
query = buildStringCondition("cluster", f.Cluster, query) query = buildStringCondition("cluster", f.Cluster, query)
} }
if f.Subcluster != nil { if f.Subcluster != nil {
query = buildStringCondition("subcluster", f.Subcluster, query) query = buildStringCondition("subcluster", f.Subcluster, query)
} }
if f.Hostname != nil {
query = buildStringCondition("hostname", f.Hostname, query)
}
if f.SchedulerState != nil { if f.SchedulerState != nil {
query = query.Where("node_state = ?", f.SchedulerState) query = query.Where("node_state = ?", f.SchedulerState)
// Requires Additional time_stamp Filter: Else the last (past!) time_stamp with queried state will be returned // Requires Additional time_stamp Filter: Else the last (past!) time_stamp with queried state will be returned
now := time.Now().Unix() now := 1760097536 // time.Now().Unix()
query = query.Where(sq.Gt{"time_stamp": (now - 60)}) query = query.Where(sq.Gt{"time_stamp": (now - 60)})
} }
if f.HealthState != nil { if f.HealthState != nil {
@@ -309,9 +310,12 @@ func (r *NodeRepository) QueryNodes(
} }
} }
// Add Grouping and ORder after filters query = query.GroupBy("node_id").OrderBy("hostname ASC")
query = query.GroupBy("node_id").
OrderBy("hostname ASC") if page != nil && page.ItemsPerPage != -1 {
limit := uint64(page.ItemsPerPage)
query = query.Offset((uint64(page.Page) - 1) * limit).Limit(limit)
}
rows, err := query.RunWith(r.stmtCache).Query() rows, err := query.RunWith(r.stmtCache).Query()
if err != nil { if err != nil {
@@ -320,7 +324,7 @@ func (r *NodeRepository) QueryNodes(
return nil, err return nil, err
} }
nodes := make([]*schema.Node, 0, 50) nodes := make([]*schema.Node, 0)
for rows.Next() { for rows.Next() {
node := schema.Node{} node := schema.Node{}
var timestamp int var timestamp int
@@ -336,6 +340,67 @@ func (r *NodeRepository) QueryNodes(
return nodes, nil return nodes, nil
} }
// CountNodes returns the total matched nodes based on a node filter. It always operates
// on the last state (largest timestamp).
func (r *NodeRepository) CountNodes(
ctx context.Context,
filters []*model.NodeFilter,
) (int, error) {
query, qerr := AccessCheck(ctx,
sq.Select("time_stamp", "count(*) as countRes").
From("node").
Join("node_state ON node_state.node_id = node.id"))
if qerr != nil {
return 0, qerr
}
for _, f := range filters {
if f.Cluster != nil {
query = buildStringCondition("cluster", f.Cluster, query)
}
if f.Subcluster != nil {
query = buildStringCondition("subcluster", f.Subcluster, query)
}
if f.Hostname != nil {
query = buildStringCondition("hostname", f.Hostname, query)
}
if f.SchedulerState != nil {
query = query.Where("node_state = ?", f.SchedulerState)
// Requires Additional time_stamp Filter: Else the last (past!) time_stamp with queried state will be returned
now := 1760097536 // time.Now().Unix()
query = query.Where(sq.Gt{"time_stamp": (now - 60)})
}
if f.HealthState != nil {
query = query.Where("health_state = ?", f.HealthState)
// Requires Additional time_stamp Filter: Else the last (past!) time_stamp with queried state will be returned
now := time.Now().Unix()
query = query.Where(sq.Gt{"time_stamp": (now - 60)})
}
}
query = query.GroupBy("time_stamp").OrderBy("time_stamp DESC").Limit(1)
rows, err := query.RunWith(r.stmtCache).Query()
if err != nil {
queryString, queryVars, _ := query.ToSql()
cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err)
return 0, err
}
var totalNodes int
for rows.Next() {
var timestamp int
if err := rows.Scan(&timestamp, &totalNodes); err != nil {
rows.Close()
cclog.Warnf("Error while scanning rows (CountNodes) at time '%d'", timestamp)
return 0, err
}
}
return totalNodes, nil
}
func (r *NodeRepository) ListNodes(cluster string) ([]*schema.Node, error) { func (r *NodeRepository) ListNodes(cluster string) ([]*schema.Node, error) {
q := sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state", q := sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state",
"node_state.health_state", "MAX(node_state.time_stamp) as time"). "node_state.health_state", "MAX(node_state.time_stamp) as time").

View File

@@ -157,7 +157,7 @@
<!-- ROW1: Tools--> <!-- ROW1: Tools-->
<Row cols={{ xs: 2, lg: !displayNodeOverview ? (resampleConfig ? 6 : 5) : 5 }} class="mb-3"> <Row cols={{ xs: 2, lg: !displayNodeOverview ? (resampleConfig ? 6 : 5) : 5 }} class="mb-3">
{#if $initq.data} {#if $initq?.data}
<!-- List Metric Select Col--> <!-- List Metric Select Col-->
{#if !displayNodeOverview} {#if !displayNodeOverview}
<Col> <Col>

View File

@@ -15,6 +15,7 @@
--> -->
<script> <script>
import { untrack } from "svelte";
import { queryStore, gql, getContextClient, mutationStore } from "@urql/svelte"; import { queryStore, gql, getContextClient, mutationStore } from "@urql/svelte";
import { Row, Col, Card, Table, Spinner } from "@sveltestrap/sveltestrap"; import { Row, Col, Card, Table, Spinner } from "@sveltestrap/sveltestrap";
import { stickyHeader } from "../generic/utils.js"; import { stickyHeader } from "../generic/utils.js";
@@ -137,17 +138,21 @@
}); });
$effect(() => { $effect(() => {
if ($nodesQuery?.data) {
untrack(() => {
handleNodes($nodesQuery?.data?.nodeMetricsList); handleNodes($nodesQuery?.data?.nodeMetricsList);
}); });
};
});
$effect(() => { $effect(() => {
// Triggers (Except Paging) // Triggers (Except Paging)
from, to from, to
selectedMetrics, selectedResolution selectedMetrics, selectedResolution
hostnameFilter, hoststateFilter hostnameFilter, hoststateFilter
// Continous Scroll: Reset nodes and paging if parameters change: Existing entries will not match new selections // Continous Scroll: Paging if parameters change: Existing entries will not match new selections
// Nodes Array Reset in HandleNodes func
if (!usePaging) { if (!usePaging) {
nodes = [];
page = 1; page = 1;
} }
}); });
@@ -155,17 +160,19 @@
/* Functions */ /* Functions */
function handleNodes(data) { function handleNodes(data) {
if (data) { if (data) {
matchedNodes = data.totalNodes; if (usePaging) {
if (usePaging || nodes.length == 0) { // console.log('New Paging', $state.snapshot(paging))
nodes = [...data.items].sort((a, b) => a.host.localeCompare(b.host)); nodes = [...data.items].sort((a, b) => a.host.localeCompare(b.host));
} else { } else {
// Workaround to ignore secondary store triggers (reason tbd) if ($state.snapshot(page) == 1) {
const oldNodes = $state.snapshot(nodes) // console.log('Page 1 Reset', [...data.items])
const newNodes = [...data.items].map((d) => d.host) nodes = [...data.items].sort((a, b) => a.host.localeCompare(b.host));
if (!oldNodes.some((n) => newNodes.includes(n.host))) { } else {
nodes = nodes.concat([...data.items].sort((a, b) => a.host.localeCompare(b.host))) // console.log('Add Nodes', $state.snapshot(nodes), [...data.items])
}; nodes = nodes.concat([...data.items])
}; }
}
matchedNodes = data.totalNodes;
}; };
}; };