From 8b0881fb177622f076dfa6b5b9cecce31c8e9860 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 18 Mar 2026 11:20:10 +0100 Subject: [PATCH] Exclude down nodes from HealthCheck Entire-Checkpoint: 0c3347168c79 --- internal/api/nats.go | 26 ++++++++++++++++++++------ internal/api/node.go | 32 ++++++++++++++++++++++++-------- 2 files changed, 44 insertions(+), 14 deletions(-) diff --git a/internal/api/nats.go b/internal/api/nats.go index db229a04..db33c0e2 100644 --- a/internal/api/nats.go +++ b/internal/api/nats.go @@ -402,12 +402,21 @@ func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) { repo := repository.GetNodeRepository() requestReceived := time.Now().Unix() - // Build nodeList per subcluster for health check + // Pre-compute node states; only include non-down nodes in health check + nodeStates := make(map[string]schema.SchedulerState, len(req.Nodes)) + for _, node := range req.Nodes { + nodeStates[node.Hostname] = determineState(node.States) + } + + // Build nodeList per subcluster for health check, skipping down nodes m := make(map[string][]string) metricNames := make(map[string][]string) healthResults := make(map[string]metricstore.HealthCheckResult) for _, node := range req.Nodes { + if nodeStates[node.Hostname] == schema.NodeStateDown { + continue + } if sc, err := archive.GetSubClusterByNode(req.Cluster, node.Hostname); err == nil { m[sc] = append(m[sc], node.Hostname) } @@ -436,12 +445,17 @@ func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) { updates := make([]repository.NodeStateUpdate, 0, len(req.Nodes)) for _, node := range req.Nodes { - state := determineState(node.States) - healthState := schema.MonitoringStateFailed + state := nodeStates[node.Hostname] + var healthState schema.MonitoringState var healthMetrics string - if result, ok := healthResults[node.Hostname]; ok { - healthState = result.State - healthMetrics = result.HealthMetrics + if state == schema.NodeStateDown { + healthState = schema.MonitoringStateFull + } else { + healthState = schema.MonitoringStateFailed + if result, ok := healthResults[node.Hostname]; ok { + healthState = result.State + healthMetrics = result.HealthMetrics + } } nodeState := schema.NodeStateDB{ TimeStamp: requestReceived, diff --git a/internal/api/node.go b/internal/api/node.go index c6727866..2b86fbad 100644 --- a/internal/api/node.go +++ b/internal/api/node.go @@ -36,7 +36,7 @@ func metricListToNames(metricList map[string]*schema.Metric) []string { // determineState resolves multiple states to a single state using priority order: // allocated > reserved > idle > down > mixed. -// Exception: if both idle and down are present, idle is returned. +// Exception: if both idle and down are present, down is returned. func determineState(states []string) schema.SchedulerState { stateSet := make(map[string]bool, len(states)) for _, s := range states { @@ -48,6 +48,8 @@ func determineState(states []string) schema.SchedulerState { return schema.NodeStateAllocated case stateSet["reserved"]: return schema.NodeStateReserved + case stateSet["idle"] && stateSet["down"]: + return schema.NodeStateDown case stateSet["idle"]: return schema.NodeStateIdle case stateSet["down"]: @@ -84,14 +86,23 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { requestReceived := time.Now().Unix() repo := repository.GetNodeRepository() + // Step 1: Pre-compute node states; only include non-down nodes in health check + nodeStates := make(map[string]schema.SchedulerState, len(req.Nodes)) + for _, node := range req.Nodes { + nodeStates[node.Hostname] = determineState(node.States) + } + m := make(map[string][]string) metricNames := make(map[string][]string) healthResults := make(map[string]metricstore.HealthCheckResult) startMs := time.Now() - // Step 1: Build nodeList and metricList per subcluster + // Step 2: Build nodeList and metricList per subcluster, skipping down nodes for _, node := range req.Nodes { + if nodeStates[node.Hostname] == schema.NodeStateDown { + continue + } if sc, err := archive.GetSubClusterByNode(req.Cluster, node.Hostname); err == nil { m[sc] = append(m[sc], node.Hostname) } @@ -104,7 +115,7 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { } } - // Step 2: Determine which metric store to query and perform health check + // Step 3: Determine which metric store to query and perform health check healthRepo, err := metricdispatch.GetHealthCheckRepo(req.Cluster) if err != nil { cclog.Warnf("updateNodeStates: no metric store for cluster %s, skipping health check: %v", req.Cluster, err) @@ -123,12 +134,17 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { updates := make([]repository.NodeStateUpdate, 0, len(req.Nodes)) for _, node := range req.Nodes { - state := determineState(node.States) - healthState := schema.MonitoringStateFailed + state := nodeStates[node.Hostname] + var healthState schema.MonitoringState var healthMetrics string - if result, ok := healthResults[node.Hostname]; ok { - healthState = result.State - healthMetrics = result.HealthMetrics + if state == schema.NodeStateDown { + healthState = schema.MonitoringStateFull + } else { + healthState = schema.MonitoringStateFailed + if result, ok := healthResults[node.Hostname]; ok { + healthState = result.State + healthMetrics = result.HealthMetrics + } } nodeState := schema.NodeStateDB{ TimeStamp: requestReceived,