Optimized CCMS healthcheck

This commit is contained in:
Aditya Ujeniya
2026-02-04 10:24:45 +01:00
parent 42ce598865
commit 39b8356683
5 changed files with 52 additions and 290 deletions

View File

@@ -135,45 +135,3 @@ func debugMetrics(rw http.ResponseWriter, r *http.Request) {
return
}
}
// handleHealthCheck godoc
// @summary HealthCheck endpoint
// @tags healthcheck
// @description This endpoint allows the users to check if a node is healthy
// @produce json
// @param selector query string false "Selector"
// @success 200 {string} string "Debug dump"
// @failure 400 {object} api.ErrorResponse "Bad Request"
// @failure 401 {object} api.ErrorResponse "Unauthorized"
// @failure 403 {object} api.ErrorResponse "Forbidden"
// @failure 500 {object} api.ErrorResponse "Internal Server Error"
// @security ApiKeyAuth
// @router /healthcheck/ [get]
func metricsHealth(rw http.ResponseWriter, r *http.Request) {
rawCluster := r.URL.Query().Get("cluster")
rawSubCluster := r.URL.Query().Get("subcluster")
rawNode := r.URL.Query().Get("node")
if rawCluster == "" || rawNode == "" {
handleError(errors.New("'cluster' and 'node' are required query parameter"), http.StatusBadRequest, rw)
return
}
rw.Header().Add("Content-Type", "application/json")
selector := []string{rawCluster, rawNode}
ms := metricstore.GetMemoryStore()
response, err := ms.HealthCheck(selector, rawSubCluster)
if err != nil {
handleError(err, http.StatusBadRequest, rw)
return
}
jsonData, err := json.Marshal(response)
if err != nil {
cclog.Errorf("Error marshaling HealthCheckResponse JSON: %s", err)
}
rw.Write(jsonData)
}

View File

@@ -91,7 +91,7 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
if sc != "" {
metricList := archive.GetMetricConfigSubCluster(req.Cluster, sc)
metricNames := metricListToNames(metricList)
if states, err := ms.HealthCheckAlt(req.Cluster, nl, metricNames); err == nil {
if states, err := ms.HealthCheck(req.Cluster, nl, metricNames); err == nil {
maps.Copy(healthStates, states)
}
}

View File

@@ -81,7 +81,7 @@ func (api *RestAPI) MountAPIRoutes(r *mux.Router) {
// Cluster List
r.HandleFunc("/clusters/", api.getClusters).Methods(http.MethodGet)
// Slurm node state
r.HandleFunc("/nodestate/", api.updateNodeStates).Methods(http.MethodPost, http.MethodPut)
r.HandleFunc("/nodestates/", api.updateNodeStates).Methods(http.MethodPost, http.MethodPut)
// Job Handler
if config.Keys.APISubjects == nil {
cclog.Info("Enabling REST start/stop job API")
@@ -127,12 +127,12 @@ func (api *RestAPI) MountMetricStoreAPIRoutes(r *mux.Router) {
r.HandleFunc("/free", freeMetrics).Methods(http.MethodPost)
r.HandleFunc("/write", writeMetrics).Methods(http.MethodPost)
r.HandleFunc("/debug", debugMetrics).Methods(http.MethodGet)
r.HandleFunc("/healthcheck", metricsHealth).Methods(http.MethodGet)
r.HandleFunc("/healthcheck", api.updateNodeStates).Methods(http.MethodPost)
// Same endpoints but with trailing slash
r.HandleFunc("/free/", freeMetrics).Methods(http.MethodPost)
r.HandleFunc("/write/", writeMetrics).Methods(http.MethodPost)
r.HandleFunc("/debug/", debugMetrics).Methods(http.MethodGet)
r.HandleFunc("/healthcheck/", metricsHealth).Methods(http.MethodGet)
r.HandleFunc("/healthcheck/", api.updateNodeStates).Methods(http.MethodPost)
}
// MountConfigAPIRoutes registers configuration and user management endpoints.

View File

@@ -6,7 +6,9 @@
package metricstore
import (
"cmp"
"fmt"
"slices"
"time"
"github.com/ClusterCockpit/cc-lib/v2/schema"
@@ -15,14 +17,6 @@ import (
type HeathCheckResponse struct {
Status schema.MonitoringState
Error error
list List
}
type List struct {
StaleNodeMetricList []string
StaleHardwareMetricList map[string][]string
MissingNodeMetricList []string
MissingHardwareMetricList map[string][]string
}
// MaxMissingDataPoints is a threshold that allows a node to be healthy with certain number of data points missing.
@@ -30,178 +24,17 @@ type List struct {
// node is healthy. Anything more than 5 missing points in metrics of the node will deem the node unhealthy.
const MaxMissingDataPoints int64 = 5
func (b *buffer) healthCheck() bool {
// isBufferHealthy checks if a buffer has received data for the last MaxMissingDataPoints.
//
// Returns true if the buffer is healthy (recent data within threshold), false otherwise.
// A nil buffer or empty buffer is considered unhealthy.
func (b *buffer) bufferExists() bool {
// Check if the buffer is empty
if b.data == nil {
return true
if b == nil || b.data == nil || len(b.data) == 0 {
return false
}
bufferEnd := b.start + b.frequency*int64(len(b.data))
t := time.Now().Unix()
// Check if the buffer is too old
if t-bufferEnd > MaxMissingDataPoints*b.frequency {
return true
}
return false
}
// healthCheck recursively examines a level and all its children to identify stale or missing metrics.
//
// This routine performs a two-phase check:
//
// Phase 1 - Check metrics at current level (node-level metrics):
// - Iterates through all configured metrics in m.Metrics
// - For each metric, checks if a buffer exists at l.metrics[mc.offset]
// - If buffer exists: calls buffer.healthCheck() to verify data freshness
// - Stale buffer (data older than MaxMissingDataPoints * frequency) → StaleNodeMetricList
// - Fresh buffer → healthy, no action
// - If buffer is nil: metric was never written → MissingNodeMetricList
//
// Phase 2 - Recursively check child levels (hardware-level metrics):
// - Iterates through l.children (e.g., "cpu0", "gpu0", "socket0")
// - Recursively calls healthCheck() on each child level
// - Aggregates child results into hardware-specific lists:
// - Child's StaleNodeMetricList → parent's StaleHardwareMetricList[childName]
// - Child's MissingNodeMetricList → parent's MissingHardwareMetricList[childName]
//
// The recursive nature means:
// - Calling on a host level checks: host metrics + all CPU/GPU/socket metrics
// - Calling on a socket level checks: socket metrics + all core metrics
// - Leaf levels (e.g., individual cores) only check their own metrics
//
// Parameters:
// - m: MemoryStore containing the global metric configuration (m.Metrics)
//
// Returns:
// - List: Categorized lists of stale and missing metrics at this level and below
// - error: Non-nil only for internal errors during recursion
//
// Concurrency:
// - Acquires read lock (RLock) to safely access l.metrics and l.children
// - Lock held for entire duration including recursive calls
//
// Example for host level with structure: host → [cpu0, cpu1]:
// - Checks host-level metrics (load, memory) → StaleNodeMetricList / MissingNodeMetricList
// - Recursively checks cpu0 metrics → results in StaleHardwareMetricList["cpu0"]
// - Recursively checks cpu1 metrics → results in StaleHardwareMetricList["cpu1"]
func (l *Level) healthCheck(m *MemoryStore) (List, error) {
l.lock.RLock()
defer l.lock.RUnlock()
list := List{
StaleNodeMetricList: make([]string, 0),
StaleHardwareMetricList: make(map[string][]string, 0),
MissingNodeMetricList: make([]string, 0),
MissingHardwareMetricList: make(map[string][]string, 0),
}
// Phase 1: Check metrics at this level
for metricName, mc := range m.Metrics {
if b := l.metrics[mc.offset]; b != nil {
if b.healthCheck() {
list.StaleNodeMetricList = append(list.StaleNodeMetricList, metricName)
}
} else {
list.MissingNodeMetricList = append(list.MissingNodeMetricList, metricName)
}
}
// Phase 2: Recursively check child levels (hardware components)
for hardwareMetricName, lvl := range l.children {
l, err := lvl.healthCheck(m)
if err != nil {
return List{}, err
}
if len(l.StaleNodeMetricList) != 0 {
list.StaleHardwareMetricList[hardwareMetricName] = l.StaleNodeMetricList
}
if len(l.MissingNodeMetricList) != 0 {
list.MissingHardwareMetricList[hardwareMetricName] = l.MissingNodeMetricList
}
}
return list, nil
}
// HealthCheck performs a health check on a specific node in the metric store.
//
// This routine checks whether metrics for a given node are being received and are up-to-date.
// It examines both node-level metrics (e.g., load, memory) and hardware-level metrics
// (e.g., CPU, GPU, network) to determine the monitoring state.
//
// Parameters:
// - selector: Hierarchical path to the target node, typically []string{cluster, hostname}.
// Example: []string{"emmy", "node001"} navigates to the "node001" host in the "emmy" cluster.
// The selector must match the hierarchy used during metric ingestion (see Level.findLevelOrCreate).
// - subcluster: Subcluster name (currently unused, reserved for future filtering)
//
// Returns:
// - *HeathCheckResponse: Health status with detailed lists of stale/missing metrics
// - error: Non-nil only for internal errors (not for unhealthy nodes)
//
// Health States:
// - MonitoringStateFull: All expected metrics are present and up-to-date
// - MonitoringStatePartial: Some metrics are stale (data older than MaxMissingDataPoints * frequency)
// - MonitoringStateFailed: Host not found, or metrics are completely missing
//
// The response includes detailed lists:
// - StaleNodeMetricList: Node-level metrics with stale data
// - StaleHardwareMetricList: Hardware-level metrics with stale data (grouped by component)
// - MissingNodeMetricList: Expected node-level metrics that have no data
// - MissingHardwareMetricList: Expected hardware-level metrics that have no data (grouped by component)
//
// Example usage:
//
// selector := []string{"emmy", "node001"}
// response, err := ms.HealthCheck(selector, "")
// if err != nil {
// // Internal error
// }
// switch response.Status {
// case schema.MonitoringStateFull:
// // All metrics healthy
// case schema.MonitoringStatePartial:
// // Check response.list.StaleNodeMetricList for details
// case schema.MonitoringStateFailed:
// // Check response.Error or response.list.MissingNodeMetricList
// }
func (m *MemoryStore) HealthCheck(selector []string, subcluster string) (*HeathCheckResponse, error) {
response := HeathCheckResponse{
Status: schema.MonitoringStateFull,
}
lvl := m.root.findLevel(selector)
if lvl == nil {
response.Status = schema.MonitoringStateFailed
response.Error = fmt.Errorf("[METRICSTORE]> error while HealthCheck, host not found: %#v", selector)
return &response, nil
}
var err error
response.list, err = lvl.healthCheck(m)
if err != nil {
return nil, err
}
fmt.Printf("Response: %#v\n", response)
if len(response.list.StaleNodeMetricList) != 0 ||
len(response.list.StaleHardwareMetricList) != 0 {
response.Status = schema.MonitoringStatePartial
return &response, nil
}
if len(response.list.MissingHardwareMetricList) != 0 ||
len(response.list.MissingNodeMetricList) != 0 {
response.Status = schema.MonitoringStateFailed
return &response, nil
}
return &response, nil
return true
}
// isBufferHealthy checks if a buffer has received data for the last MaxMissingDataPoints.
@@ -209,11 +42,7 @@ func (m *MemoryStore) HealthCheck(selector []string, subcluster string) (*HeathC
// Returns true if the buffer is healthy (recent data within threshold), false otherwise.
// A nil buffer or empty buffer is considered unhealthy.
func (b *buffer) isBufferHealthy() bool {
// Check if the buffer is empty
if b == nil || b.data == nil {
return false
}
// Get the last endtime of the buffer
bufferEnd := b.start + b.frequency*int64(len(b.data))
t := time.Now().Unix()
@@ -225,32 +54,20 @@ func (b *buffer) isBufferHealthy() bool {
return true
}
// countMissingValues counts the number of NaN (missing) values in the most recent data points.
//
// Examines the last MaxMissingDataPoints*2 values in the buffer and counts how many are NaN.
// We check twice the threshold to allow detecting when more than MaxMissingDataPoints are missing.
// If the buffer has fewer values, examines all available values.
//
// Returns:
// - int: Number of NaN values found in the examined range
func (b *buffer) countMissingValues() int {
if b == nil || b.data == nil || len(b.data) == 0 {
return 0
}
// MergeUniqueSorted merges two lists, sorts them, and removes duplicates.
// Requires 'cmp.Ordered' because we need to sort the data.
func mergeList[string cmp.Ordered](list1, list2 []string) []string {
// 1. Combine both lists
result := append(list1, list2...)
// Check twice the threshold to detect degraded metrics
checkCount := min(int(MaxMissingDataPoints)*2, len(b.data))
// 2. Sort the combined list
slices.Sort(result)
// Count NaN values in the most recent data points
missingCount := 0
startIdx := len(b.data) - checkCount
for i := startIdx; i < len(b.data); i++ {
if b.data[i].IsNaN() {
missingCount++
}
}
// 3. Compact removes consecutive duplicates (standard in Go 1.21+)
// e.g. [1, 1, 2, 3, 3] -> [1, 2, 3]
result = slices.Compact(result)
return missingCount
return result
}
// getHealthyMetrics recursively collects healthy and degraded metrics at this level and below.
@@ -272,37 +89,39 @@ func (b *buffer) countMissingValues() int {
// The routine mirrors healthCheck() but provides more granular classification:
// - healthCheck() finds problems (stale/missing)
// - getHealthyMetrics() separates healthy from degraded metrics
func (l *Level) getHealthyMetrics(m *MemoryStore) ([]string, []string, error) {
func (l *Level) getHealthyMetrics(m *MemoryStore, expectedMetrics []string) ([]string, []string, error) {
l.lock.RLock()
defer l.lock.RUnlock()
healthyList := make([]string, 0)
globalMetrics := m.Metrics
missingList := make([]string, 0)
degradedList := make([]string, 0)
// Phase 1: Check metrics at this level
for metricName, mc := range m.Metrics {
b := l.metrics[mc.offset]
if b.isBufferHealthy() {
healthyList = append(healthyList, metricName)
} else {
for _, metricName := range expectedMetrics {
offset := globalMetrics[metricName].offset
b := l.metrics[offset]
if !b.bufferExists() {
missingList = append(missingList, metricName)
} else if !b.isBufferHealthy() {
degradedList = append(degradedList, metricName)
}
}
// Phase 2: Recursively check child levels
for _, lvl := range l.children {
childHealthy, childDegraded, err := lvl.getHealthyMetrics(m)
childMissing, childDegraded, err := lvl.getHealthyMetrics(m, expectedMetrics)
if err != nil {
return nil, nil, err
}
// FIXME: Use a map to collect core level metrics
// Merge child metrics into flat lists
healthyList = append(healthyList, childHealthy...)
degradedList = append(degradedList, childDegraded...)
missingList = mergeList(missingList, childMissing)
degradedList = mergeList(degradedList, childDegraded)
}
return healthyList, degradedList, nil
return missingList, degradedList, nil
}
// GetHealthyMetrics returns healthy and degraded metrics for a specific node as flat lists.
@@ -343,18 +162,18 @@ func (l *Level) getHealthyMetrics(m *MemoryStore) ([]string, []string, error) {
// Note: This routine provides more granular classification than HealthCheck:
// - HealthCheck reports stale/missing metrics (problems)
// - GetHealthyMetrics separates fully healthy from degraded metrics (quality levels)
func (m *MemoryStore) GetHealthyMetrics(selector []string) ([]string, []string, error) {
func (m *MemoryStore) GetHealthyMetrics(selector []string, expectedMetrics []string) ([]string, []string, error) {
lvl := m.root.findLevel(selector)
if lvl == nil {
return nil, nil, fmt.Errorf("[METRICSTORE]> error while GetHealthyMetrics, host not found: %#v", selector)
}
healthyList, degradedList, err := lvl.getHealthyMetrics(m)
missingList, degradedList, err := lvl.getHealthyMetrics(m, expectedMetrics)
if err != nil {
return nil, nil, err
}
return healthyList, degradedList, nil
return missingList, degradedList, nil
}
// HealthCheckAlt performs health checks on multiple nodes and returns their monitoring states.
@@ -393,7 +212,7 @@ func (m *MemoryStore) GetHealthyMetrics(selector []string) ([]string, []string,
// Note: This routine is optimized for batch operations where you need to check
// the same set of metrics across multiple nodes. For single-node checks with
// all configured metrics, use HealthCheck() instead.
func (m *MemoryStore) HealthCheckAlt(cluster string,
func (m *MemoryStore) HealthCheck(cluster string,
nodes []string, expectedMetrics []string,
) (map[string]schema.MonitoringState, error) {
results := make(map[string]schema.MonitoringState, len(nodes))
@@ -413,33 +232,16 @@ func (m *MemoryStore) HealthCheckAlt(cluster string,
missingCount := 0
// Get healthy and degraded metrics for this node
healthyList, degradedList, err := m.GetHealthyMetrics(selector)
missingList, degradedList, err := m.GetHealthyMetrics(selector, expectedMetrics)
if err != nil {
// Node not found or internal error
results[hostname] = schema.MonitoringStateFailed
continue
}
// Create sets for fast lookup
healthySet := make(map[string]bool, len(healthyList))
for _, metric := range healthyList {
healthySet[metric] = true
}
degradedSet := make(map[string]bool, len(degradedList))
for _, metric := range degradedList {
degradedSet[metric] = true
}
// Classify each expected metric
for _, metric := range expectedMetrics {
if healthySet[metric] {
healthyCount++
} else if degradedSet[metric] {
degradedCount++
} else {
missingCount++
}
}
missingCount = len(missingList)
degradedCount = len(degradedList)
healthyCount = len(expectedMetrics) - (missingCount + degradedCount)
// Determine overall health status
if missingCount > 0 || degradedCount > 0 {
@@ -456,5 +258,7 @@ func (m *MemoryStore) HealthCheckAlt(cluster string,
results[hostname] = status
}
fmt.Printf("Results : %#v\n\n", results)
return results, nil
}

View File

@@ -219,7 +219,7 @@ func TestHealthCheckAlt(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
results, err := ms.HealthCheckAlt(tt.cluster, tt.nodes, tt.expectedMetrics)
results, err := ms.HealthCheck(tt.cluster, tt.nodes, tt.expectedMetrics)
if err != nil {
t.Errorf("HealthCheckAlt() error = %v", err)
return