diff --git a/internal/api/metricstore.go b/internal/api/metricstore.go index d99222d2..5c15bb2c 100644 --- a/internal/api/metricstore.go +++ b/internal/api/metricstore.go @@ -135,45 +135,3 @@ func debugMetrics(rw http.ResponseWriter, r *http.Request) { return } } - -// handleHealthCheck godoc -// @summary HealthCheck endpoint -// @tags healthcheck -// @description This endpoint allows the users to check if a node is healthy -// @produce json -// @param selector query string false "Selector" -// @success 200 {string} string "Debug dump" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 403 {object} api.ErrorResponse "Forbidden" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /healthcheck/ [get] -func metricsHealth(rw http.ResponseWriter, r *http.Request) { - rawCluster := r.URL.Query().Get("cluster") - rawSubCluster := r.URL.Query().Get("subcluster") - rawNode := r.URL.Query().Get("node") - - if rawCluster == "" || rawNode == "" { - handleError(errors.New("'cluster' and 'node' are required query parameter"), http.StatusBadRequest, rw) - return - } - - rw.Header().Add("Content-Type", "application/json") - - selector := []string{rawCluster, rawNode} - - ms := metricstore.GetMemoryStore() - response, err := ms.HealthCheck(selector, rawSubCluster) - if err != nil { - handleError(err, http.StatusBadRequest, rw) - return - } - - jsonData, err := json.Marshal(response) - if err != nil { - cclog.Errorf("Error marshaling HealthCheckResponse JSON: %s", err) - } - - rw.Write(jsonData) -} diff --git a/internal/api/node.go b/internal/api/node.go index ce8a263a..37a8576c 100644 --- a/internal/api/node.go +++ b/internal/api/node.go @@ -91,7 +91,7 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { if sc != "" { metricList := archive.GetMetricConfigSubCluster(req.Cluster, sc) metricNames := metricListToNames(metricList) - if states, err := ms.HealthCheckAlt(req.Cluster, nl, metricNames); err == nil { + if states, err := ms.HealthCheck(req.Cluster, nl, metricNames); err == nil { maps.Copy(healthStates, states) } } diff --git a/internal/api/rest.go b/internal/api/rest.go index 0d52742e..3f6d9609 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -81,7 +81,7 @@ func (api *RestAPI) MountAPIRoutes(r *mux.Router) { // Cluster List r.HandleFunc("/clusters/", api.getClusters).Methods(http.MethodGet) // Slurm node state - r.HandleFunc("/nodestate/", api.updateNodeStates).Methods(http.MethodPost, http.MethodPut) + r.HandleFunc("/nodestates/", api.updateNodeStates).Methods(http.MethodPost, http.MethodPut) // Job Handler if config.Keys.APISubjects == nil { cclog.Info("Enabling REST start/stop job API") @@ -127,12 +127,12 @@ func (api *RestAPI) MountMetricStoreAPIRoutes(r *mux.Router) { r.HandleFunc("/free", freeMetrics).Methods(http.MethodPost) r.HandleFunc("/write", writeMetrics).Methods(http.MethodPost) r.HandleFunc("/debug", debugMetrics).Methods(http.MethodGet) - r.HandleFunc("/healthcheck", metricsHealth).Methods(http.MethodGet) + r.HandleFunc("/healthcheck", api.updateNodeStates).Methods(http.MethodPost) // Same endpoints but with trailing slash r.HandleFunc("/free/", freeMetrics).Methods(http.MethodPost) r.HandleFunc("/write/", writeMetrics).Methods(http.MethodPost) r.HandleFunc("/debug/", debugMetrics).Methods(http.MethodGet) - r.HandleFunc("/healthcheck/", metricsHealth).Methods(http.MethodGet) + r.HandleFunc("/healthcheck/", api.updateNodeStates).Methods(http.MethodPost) } // MountConfigAPIRoutes registers configuration and user management endpoints. diff --git a/pkg/metricstore/healthcheck.go b/pkg/metricstore/healthcheck.go index da5ccc7d..8e8e7952 100644 --- a/pkg/metricstore/healthcheck.go +++ b/pkg/metricstore/healthcheck.go @@ -6,7 +6,9 @@ package metricstore import ( + "cmp" "fmt" + "slices" "time" "github.com/ClusterCockpit/cc-lib/v2/schema" @@ -15,14 +17,6 @@ import ( type HeathCheckResponse struct { Status schema.MonitoringState Error error - list List -} - -type List struct { - StaleNodeMetricList []string - StaleHardwareMetricList map[string][]string - MissingNodeMetricList []string - MissingHardwareMetricList map[string][]string } // MaxMissingDataPoints is a threshold that allows a node to be healthy with certain number of data points missing. @@ -30,178 +24,17 @@ type List struct { // node is healthy. Anything more than 5 missing points in metrics of the node will deem the node unhealthy. const MaxMissingDataPoints int64 = 5 -func (b *buffer) healthCheck() bool { +// isBufferHealthy checks if a buffer has received data for the last MaxMissingDataPoints. +// +// Returns true if the buffer is healthy (recent data within threshold), false otherwise. +// A nil buffer or empty buffer is considered unhealthy. +func (b *buffer) bufferExists() bool { // Check if the buffer is empty - if b.data == nil { - return true + if b == nil || b.data == nil || len(b.data) == 0 { + return false } - bufferEnd := b.start + b.frequency*int64(len(b.data)) - t := time.Now().Unix() - - // Check if the buffer is too old - if t-bufferEnd > MaxMissingDataPoints*b.frequency { - return true - } - - return false -} - -// healthCheck recursively examines a level and all its children to identify stale or missing metrics. -// -// This routine performs a two-phase check: -// -// Phase 1 - Check metrics at current level (node-level metrics): -// - Iterates through all configured metrics in m.Metrics -// - For each metric, checks if a buffer exists at l.metrics[mc.offset] -// - If buffer exists: calls buffer.healthCheck() to verify data freshness -// - Stale buffer (data older than MaxMissingDataPoints * frequency) → StaleNodeMetricList -// - Fresh buffer → healthy, no action -// - If buffer is nil: metric was never written → MissingNodeMetricList -// -// Phase 2 - Recursively check child levels (hardware-level metrics): -// - Iterates through l.children (e.g., "cpu0", "gpu0", "socket0") -// - Recursively calls healthCheck() on each child level -// - Aggregates child results into hardware-specific lists: -// - Child's StaleNodeMetricList → parent's StaleHardwareMetricList[childName] -// - Child's MissingNodeMetricList → parent's MissingHardwareMetricList[childName] -// -// The recursive nature means: -// - Calling on a host level checks: host metrics + all CPU/GPU/socket metrics -// - Calling on a socket level checks: socket metrics + all core metrics -// - Leaf levels (e.g., individual cores) only check their own metrics -// -// Parameters: -// - m: MemoryStore containing the global metric configuration (m.Metrics) -// -// Returns: -// - List: Categorized lists of stale and missing metrics at this level and below -// - error: Non-nil only for internal errors during recursion -// -// Concurrency: -// - Acquires read lock (RLock) to safely access l.metrics and l.children -// - Lock held for entire duration including recursive calls -// -// Example for host level with structure: host → [cpu0, cpu1]: -// - Checks host-level metrics (load, memory) → StaleNodeMetricList / MissingNodeMetricList -// - Recursively checks cpu0 metrics → results in StaleHardwareMetricList["cpu0"] -// - Recursively checks cpu1 metrics → results in StaleHardwareMetricList["cpu1"] -func (l *Level) healthCheck(m *MemoryStore) (List, error) { - l.lock.RLock() - defer l.lock.RUnlock() - - list := List{ - StaleNodeMetricList: make([]string, 0), - StaleHardwareMetricList: make(map[string][]string, 0), - MissingNodeMetricList: make([]string, 0), - MissingHardwareMetricList: make(map[string][]string, 0), - } - - // Phase 1: Check metrics at this level - for metricName, mc := range m.Metrics { - if b := l.metrics[mc.offset]; b != nil { - if b.healthCheck() { - list.StaleNodeMetricList = append(list.StaleNodeMetricList, metricName) - } - } else { - list.MissingNodeMetricList = append(list.MissingNodeMetricList, metricName) - } - } - - // Phase 2: Recursively check child levels (hardware components) - for hardwareMetricName, lvl := range l.children { - l, err := lvl.healthCheck(m) - if err != nil { - return List{}, err - } - - if len(l.StaleNodeMetricList) != 0 { - list.StaleHardwareMetricList[hardwareMetricName] = l.StaleNodeMetricList - } - if len(l.MissingNodeMetricList) != 0 { - list.MissingHardwareMetricList[hardwareMetricName] = l.MissingNodeMetricList - } - } - - return list, nil -} - -// HealthCheck performs a health check on a specific node in the metric store. -// -// This routine checks whether metrics for a given node are being received and are up-to-date. -// It examines both node-level metrics (e.g., load, memory) and hardware-level metrics -// (e.g., CPU, GPU, network) to determine the monitoring state. -// -// Parameters: -// - selector: Hierarchical path to the target node, typically []string{cluster, hostname}. -// Example: []string{"emmy", "node001"} navigates to the "node001" host in the "emmy" cluster. -// The selector must match the hierarchy used during metric ingestion (see Level.findLevelOrCreate). -// - subcluster: Subcluster name (currently unused, reserved for future filtering) -// -// Returns: -// - *HeathCheckResponse: Health status with detailed lists of stale/missing metrics -// - error: Non-nil only for internal errors (not for unhealthy nodes) -// -// Health States: -// - MonitoringStateFull: All expected metrics are present and up-to-date -// - MonitoringStatePartial: Some metrics are stale (data older than MaxMissingDataPoints * frequency) -// - MonitoringStateFailed: Host not found, or metrics are completely missing -// -// The response includes detailed lists: -// - StaleNodeMetricList: Node-level metrics with stale data -// - StaleHardwareMetricList: Hardware-level metrics with stale data (grouped by component) -// - MissingNodeMetricList: Expected node-level metrics that have no data -// - MissingHardwareMetricList: Expected hardware-level metrics that have no data (grouped by component) -// -// Example usage: -// -// selector := []string{"emmy", "node001"} -// response, err := ms.HealthCheck(selector, "") -// if err != nil { -// // Internal error -// } -// switch response.Status { -// case schema.MonitoringStateFull: -// // All metrics healthy -// case schema.MonitoringStatePartial: -// // Check response.list.StaleNodeMetricList for details -// case schema.MonitoringStateFailed: -// // Check response.Error or response.list.MissingNodeMetricList -// } -func (m *MemoryStore) HealthCheck(selector []string, subcluster string) (*HeathCheckResponse, error) { - response := HeathCheckResponse{ - Status: schema.MonitoringStateFull, - } - - lvl := m.root.findLevel(selector) - if lvl == nil { - response.Status = schema.MonitoringStateFailed - response.Error = fmt.Errorf("[METRICSTORE]> error while HealthCheck, host not found: %#v", selector) - return &response, nil - } - - var err error - - response.list, err = lvl.healthCheck(m) - if err != nil { - return nil, err - } - - fmt.Printf("Response: %#v\n", response) - - if len(response.list.StaleNodeMetricList) != 0 || - len(response.list.StaleHardwareMetricList) != 0 { - response.Status = schema.MonitoringStatePartial - return &response, nil - } - - if len(response.list.MissingHardwareMetricList) != 0 || - len(response.list.MissingNodeMetricList) != 0 { - response.Status = schema.MonitoringStateFailed - return &response, nil - } - - return &response, nil + return true } // isBufferHealthy checks if a buffer has received data for the last MaxMissingDataPoints. @@ -209,11 +42,7 @@ func (m *MemoryStore) HealthCheck(selector []string, subcluster string) (*HeathC // Returns true if the buffer is healthy (recent data within threshold), false otherwise. // A nil buffer or empty buffer is considered unhealthy. func (b *buffer) isBufferHealthy() bool { - // Check if the buffer is empty - if b == nil || b.data == nil { - return false - } - + // Get the last endtime of the buffer bufferEnd := b.start + b.frequency*int64(len(b.data)) t := time.Now().Unix() @@ -225,32 +54,20 @@ func (b *buffer) isBufferHealthy() bool { return true } -// countMissingValues counts the number of NaN (missing) values in the most recent data points. -// -// Examines the last MaxMissingDataPoints*2 values in the buffer and counts how many are NaN. -// We check twice the threshold to allow detecting when more than MaxMissingDataPoints are missing. -// If the buffer has fewer values, examines all available values. -// -// Returns: -// - int: Number of NaN values found in the examined range -func (b *buffer) countMissingValues() int { - if b == nil || b.data == nil || len(b.data) == 0 { - return 0 - } +// MergeUniqueSorted merges two lists, sorts them, and removes duplicates. +// Requires 'cmp.Ordered' because we need to sort the data. +func mergeList[string cmp.Ordered](list1, list2 []string) []string { + // 1. Combine both lists + result := append(list1, list2...) - // Check twice the threshold to detect degraded metrics - checkCount := min(int(MaxMissingDataPoints)*2, len(b.data)) + // 2. Sort the combined list + slices.Sort(result) - // Count NaN values in the most recent data points - missingCount := 0 - startIdx := len(b.data) - checkCount - for i := startIdx; i < len(b.data); i++ { - if b.data[i].IsNaN() { - missingCount++ - } - } + // 3. Compact removes consecutive duplicates (standard in Go 1.21+) + // e.g. [1, 1, 2, 3, 3] -> [1, 2, 3] + result = slices.Compact(result) - return missingCount + return result } // getHealthyMetrics recursively collects healthy and degraded metrics at this level and below. @@ -272,37 +89,39 @@ func (b *buffer) countMissingValues() int { // The routine mirrors healthCheck() but provides more granular classification: // - healthCheck() finds problems (stale/missing) // - getHealthyMetrics() separates healthy from degraded metrics -func (l *Level) getHealthyMetrics(m *MemoryStore) ([]string, []string, error) { +func (l *Level) getHealthyMetrics(m *MemoryStore, expectedMetrics []string) ([]string, []string, error) { l.lock.RLock() defer l.lock.RUnlock() - healthyList := make([]string, 0) + globalMetrics := m.Metrics + + missingList := make([]string, 0) degradedList := make([]string, 0) // Phase 1: Check metrics at this level - for metricName, mc := range m.Metrics { - b := l.metrics[mc.offset] - if b.isBufferHealthy() { - healthyList = append(healthyList, metricName) - } else { + for _, metricName := range expectedMetrics { + offset := globalMetrics[metricName].offset + b := l.metrics[offset] + + if !b.bufferExists() { + missingList = append(missingList, metricName) + } else if !b.isBufferHealthy() { degradedList = append(degradedList, metricName) } } // Phase 2: Recursively check child levels for _, lvl := range l.children { - childHealthy, childDegraded, err := lvl.getHealthyMetrics(m) + childMissing, childDegraded, err := lvl.getHealthyMetrics(m, expectedMetrics) if err != nil { return nil, nil, err } - // FIXME: Use a map to collect core level metrics - // Merge child metrics into flat lists - healthyList = append(healthyList, childHealthy...) - degradedList = append(degradedList, childDegraded...) + missingList = mergeList(missingList, childMissing) + degradedList = mergeList(degradedList, childDegraded) } - return healthyList, degradedList, nil + return missingList, degradedList, nil } // GetHealthyMetrics returns healthy and degraded metrics for a specific node as flat lists. @@ -343,18 +162,18 @@ func (l *Level) getHealthyMetrics(m *MemoryStore) ([]string, []string, error) { // Note: This routine provides more granular classification than HealthCheck: // - HealthCheck reports stale/missing metrics (problems) // - GetHealthyMetrics separates fully healthy from degraded metrics (quality levels) -func (m *MemoryStore) GetHealthyMetrics(selector []string) ([]string, []string, error) { +func (m *MemoryStore) GetHealthyMetrics(selector []string, expectedMetrics []string) ([]string, []string, error) { lvl := m.root.findLevel(selector) if lvl == nil { return nil, nil, fmt.Errorf("[METRICSTORE]> error while GetHealthyMetrics, host not found: %#v", selector) } - healthyList, degradedList, err := lvl.getHealthyMetrics(m) + missingList, degradedList, err := lvl.getHealthyMetrics(m, expectedMetrics) if err != nil { return nil, nil, err } - return healthyList, degradedList, nil + return missingList, degradedList, nil } // HealthCheckAlt performs health checks on multiple nodes and returns their monitoring states. @@ -393,7 +212,7 @@ func (m *MemoryStore) GetHealthyMetrics(selector []string) ([]string, []string, // Note: This routine is optimized for batch operations where you need to check // the same set of metrics across multiple nodes. For single-node checks with // all configured metrics, use HealthCheck() instead. -func (m *MemoryStore) HealthCheckAlt(cluster string, +func (m *MemoryStore) HealthCheck(cluster string, nodes []string, expectedMetrics []string, ) (map[string]schema.MonitoringState, error) { results := make(map[string]schema.MonitoringState, len(nodes)) @@ -413,33 +232,16 @@ func (m *MemoryStore) HealthCheckAlt(cluster string, missingCount := 0 // Get healthy and degraded metrics for this node - healthyList, degradedList, err := m.GetHealthyMetrics(selector) + missingList, degradedList, err := m.GetHealthyMetrics(selector, expectedMetrics) if err != nil { // Node not found or internal error results[hostname] = schema.MonitoringStateFailed continue } - // Create sets for fast lookup - healthySet := make(map[string]bool, len(healthyList)) - for _, metric := range healthyList { - healthySet[metric] = true - } - degradedSet := make(map[string]bool, len(degradedList)) - for _, metric := range degradedList { - degradedSet[metric] = true - } - - // Classify each expected metric - for _, metric := range expectedMetrics { - if healthySet[metric] { - healthyCount++ - } else if degradedSet[metric] { - degradedCount++ - } else { - missingCount++ - } - } + missingCount = len(missingList) + degradedCount = len(degradedList) + healthyCount = len(expectedMetrics) - (missingCount + degradedCount) // Determine overall health status if missingCount > 0 || degradedCount > 0 { @@ -456,5 +258,7 @@ func (m *MemoryStore) HealthCheckAlt(cluster string, results[hostname] = status } + fmt.Printf("Results : %#v\n\n", results) + return results, nil } diff --git a/pkg/metricstore/metricstore_test.go b/pkg/metricstore/metricstore_test.go index e0fcfea5..035aa1be 100644 --- a/pkg/metricstore/metricstore_test.go +++ b/pkg/metricstore/metricstore_test.go @@ -219,7 +219,7 @@ func TestHealthCheckAlt(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - results, err := ms.HealthCheckAlt(tt.cluster, tt.nodes, tt.expectedMetrics) + results, err := ms.HealthCheck(tt.cluster, tt.nodes, tt.expectedMetrics) if err != nil { t.Errorf("HealthCheckAlt() error = %v", err) return