Update to MetricStore HealthCheck API

This commit is contained in:
Aditya Ujeniya
2026-01-30 23:24:16 +01:00
parent 732fab4a04
commit a71341064e
3 changed files with 77 additions and 38 deletions

2
.gitignore vendored
View File

@@ -13,7 +13,7 @@
/var/checkpoints*
migrateTimestamps.pl
test_ccms_write_api*
test_ccms_*
/web/frontend/public/build
/web/frontend/node_modules

View File

@@ -151,6 +151,7 @@ func debugMetrics(rw http.ResponseWriter, r *http.Request) {
// @router /healthcheck/ [get]
func metricsHealth(rw http.ResponseWriter, r *http.Request) {
rawCluster := r.URL.Query().Get("cluster")
rawSubCluster := r.URL.Query().Get("subcluster")
rawNode := r.URL.Query().Get("node")
if rawCluster == "" || rawNode == "" {
@@ -163,8 +164,16 @@ func metricsHealth(rw http.ResponseWriter, r *http.Request) {
selector := []string{rawCluster, rawNode}
ms := metricstore.GetMemoryStore()
if err := ms.HealthCheck(bufio.NewWriter(rw), selector); err != nil {
response, err := ms.HealthCheck(selector, rawSubCluster)
if err != nil {
handleError(err, http.StatusBadRequest, rw)
return
}
jsonData, err := json.Marshal(response)
if err != nil {
cclog.Errorf("Error marshaling HealthCheckResponse JSON: %s", err)
}
rw.Write(jsonData)
}

View File

@@ -6,26 +6,34 @@
package metricstore
import (
"bufio"
"fmt"
"time"
"github.com/ClusterCockpit/cc-lib/v2/schema"
)
type HeathCheckResponse struct {
Status schema.MonitoringState
Error error
list List
}
type List struct {
StaleNodeMetricList []string
StaleHardwareMetricList map[string][]string
MissingNodeMetricList []string
MissingHardwareMetricList map[string][]string
}
// MaxMissingDataPoints is a threshold that allows a node to be healthy with certain number of data points missing.
// Suppose a node does not receive last 5 data points, then healthCheck endpoint will still say a
// node is healthy. Anything more than 5 missing points in metrics of the node will deem the node unhealthy.
const MaxMissingDataPoints int64 = 5
// MaxUnhealthyMetrics is a threshold which allows upto certain number of metrics in a node to be unhealthly.
// Works with MaxMissingDataPoints. Say 5 metrics (including submetrics) do not receive the last
// MaxMissingDataPoints data points, then the node will be deemed healthy. Any more metrics that does
// not receive data for MaxMissingDataPoints data points will deem the node unhealthy.
const MaxUnhealthyMetrics int64 = 5
func (b *buffer) healthCheck() int64 {
func (b *buffer) healthCheck() bool {
// Check if the buffer is empty
if b.data == nil {
return 1
return true
}
bufferEnd := b.start + b.frequency*int64(len(b.data))
@@ -33,60 +41,82 @@ func (b *buffer) healthCheck() int64 {
// Check if the buffer is too old
if t-bufferEnd > MaxMissingDataPoints*b.frequency {
return 1
return true
}
return 0
return false
}
func (l *Level) healthCheck(m *MemoryStore, count int64) (int64, error) {
func (l *Level) healthCheck(m *MemoryStore) (List, error) {
l.lock.RLock()
defer l.lock.RUnlock()
for _, mc := range m.Metrics {
list := List{
StaleNodeMetricList: make([]string, 0),
StaleHardwareMetricList: make(map[string][]string, 0),
MissingNodeMetricList: make([]string, 0),
MissingHardwareMetricList: make(map[string][]string, 0),
}
for metricName, mc := range m.Metrics {
if b := l.metrics[mc.offset]; b != nil {
count += b.healthCheck()
if b.healthCheck() {
list.StaleNodeMetricList = append(list.StaleNodeMetricList, metricName)
}
} else {
list.MissingNodeMetricList = append(list.MissingNodeMetricList, metricName)
}
}
for _, lvl := range l.children {
c, err := lvl.healthCheck(m, 0)
for hardwareMetricName, lvl := range l.children {
l, err := lvl.healthCheck(m)
if err != nil {
return 0, err
return List{}, err
}
if len(l.StaleNodeMetricList) != 0 {
list.StaleHardwareMetricList[hardwareMetricName] = l.StaleNodeMetricList
}
if len(l.MissingNodeMetricList) != 0 {
list.MissingHardwareMetricList[hardwareMetricName] = l.MissingNodeMetricList
}
count += c
}
return count, nil
return list, nil
}
func (m *MemoryStore) HealthCheck(w *bufio.Writer, selector []string) error {
func (m *MemoryStore) HealthCheck(selector []string, subcluster string) (*HeathCheckResponse, error) {
response := HeathCheckResponse{
Status: schema.MonitoringStateFull,
}
lvl := m.root.findLevel(selector)
if lvl == nil {
return fmt.Errorf("[METRICSTORE]> not found: %#v", selector)
response.Status = schema.MonitoringStateFailed
response.Error = fmt.Errorf("[METRICSTORE]> error while HealthCheck, host not found: %#v", selector)
return &response, nil
}
buf := make([]byte, 0, 25)
// buf = append(buf, "{"...)
var err error
var count int64 = 0
unhealthyMetricsCount, err := lvl.healthCheck(m, count)
response.list, err = lvl.healthCheck(m)
if err != nil {
return err
return nil, err
}
if unhealthyMetricsCount < MaxUnhealthyMetrics {
buf = append(buf, "Healthy"...)
} else {
buf = append(buf, "Unhealthy"...)
fmt.Printf("Response: %#v\n", response)
if len(response.list.StaleNodeMetricList) != 0 ||
len(response.list.StaleHardwareMetricList) != 0 {
response.Status = schema.MonitoringStatePartial
return &response, nil
}
// buf = append(buf, "}\n"...)
if _, err = w.Write(buf); err != nil {
return err
if len(response.list.MissingHardwareMetricList) != 0 ||
len(response.list.MissingNodeMetricList) != 0 {
response.Status = schema.MonitoringStateFailed
return &response, nil
}
return w.Flush()
return &response, nil
}