mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-02-17 16:31:45 +01:00
Add healthCheck support for external CCMS
This commit is contained in:
@@ -12,6 +12,13 @@
|
|||||||
"max-age": "2000h"
|
"max-age": "2000h"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"metric-store-external": [
|
||||||
|
{
|
||||||
|
"scope": "fritz",
|
||||||
|
"url": "http://0.0.0.0:8082",
|
||||||
|
"token": "eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3NzU3Nzg4NDQsImlhdCI6MTc2ODU3ODg0NCwicm9sZXMiOlsiYWRtaW4iLCJhcGkiXSwic3ViIjoiZGVtbyJ9._SDEW9WaUVXSBFmWqGhyIZXLoqoDU8F1hkfh4cXKIqF4yw7w50IUpfUBtwUFUOnoviFKoi563f6RAMC7XxeLDA"
|
||||||
|
}
|
||||||
|
],
|
||||||
"metric-store": {
|
"metric-store": {
|
||||||
"checkpoints": {
|
"checkpoints": {
|
||||||
"interval": "12h"
|
"interval": "12h"
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/ClusterCockpit/cc-backend/internal/metricdispatch"
|
||||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
|
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
|
||||||
@@ -77,28 +78,42 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
requestReceived := time.Now().Unix()
|
requestReceived := time.Now().Unix()
|
||||||
repo := repository.GetNodeRepository()
|
repo := repository.GetNodeRepository()
|
||||||
ms := metricstore.GetMemoryStore()
|
|
||||||
|
|
||||||
m := make(map[string][]string)
|
m := make(map[string][]string)
|
||||||
|
metricNames := make(map[string][]string)
|
||||||
healthResults := make(map[string]metricstore.HealthCheckResult)
|
healthResults := make(map[string]metricstore.HealthCheckResult)
|
||||||
|
|
||||||
startMs := time.Now()
|
startMs := time.Now()
|
||||||
|
|
||||||
|
// Step 1: Build nodeList and metricList per subcluster
|
||||||
for _, node := range req.Nodes {
|
for _, node := range req.Nodes {
|
||||||
if sc, err := archive.GetSubClusterByNode(req.Cluster, node.Hostname); err == nil {
|
if sc, err := archive.GetSubClusterByNode(req.Cluster, node.Hostname); err == nil {
|
||||||
m[sc] = append(m[sc], node.Hostname)
|
m[sc] = append(m[sc], node.Hostname)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for sc, nl := range m {
|
for sc := range m {
|
||||||
if sc != "" {
|
if sc != "" {
|
||||||
metricList := archive.GetMetricConfigSubCluster(req.Cluster, sc)
|
metricList := archive.GetMetricConfigSubCluster(req.Cluster, sc)
|
||||||
metricNames := metricListToNames(metricList)
|
metricNames[sc] = metricListToNames(metricList)
|
||||||
if results, err := ms.HealthCheck(req.Cluster, nl, metricNames); err == nil {
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 2: Determine which metric store to query and perform health check
|
||||||
|
healthRepo, err := metricdispatch.GetHealthCheckRepo(req.Cluster)
|
||||||
|
if err != nil {
|
||||||
|
cclog.Warnf("updateNodeStates: no metric store for cluster %s, skipping health check: %v", req.Cluster, err)
|
||||||
|
} else {
|
||||||
|
for sc, nl := range m {
|
||||||
|
if sc != "" {
|
||||||
|
if results, err := healthRepo.HealthCheck(req.Cluster, nl, metricNames[sc]); err == nil {
|
||||||
maps.Copy(healthResults, results)
|
maps.Copy(healthResults, results)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Result: %#v\n", healthResults)
|
||||||
|
|
||||||
cclog.Debugf("Timer updateNodeStates, MemStore HealthCheck: %s", time.Since(startMs))
|
cclog.Debugf("Timer updateNodeStates, MemStore HealthCheck: %s", time.Since(startMs))
|
||||||
startDB := time.Now()
|
startDB := time.Now()
|
||||||
|
|||||||
@@ -52,6 +52,11 @@ type MetricDataRepository interface {
|
|||||||
resolution int,
|
resolution int,
|
||||||
from, to time.Time,
|
from, to time.Time,
|
||||||
ctx context.Context) (map[string]schema.JobData, error)
|
ctx context.Context) (map[string]schema.JobData, error)
|
||||||
|
|
||||||
|
// HealthCheck evaluates the monitoring state for a set of nodes against expected metrics.
|
||||||
|
HealthCheck(cluster string,
|
||||||
|
nodes []string,
|
||||||
|
metrics []string) (map[string]metricstore.HealthCheckResult, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type CCMetricStoreConfig struct {
|
type CCMetricStoreConfig struct {
|
||||||
@@ -110,3 +115,9 @@ func GetMetricDataRepo(cluster string, subcluster string) (MetricDataRepository,
|
|||||||
|
|
||||||
return repo, nil
|
return repo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetHealthCheckRepo returns the MetricDataRepository for performing health checks on a cluster.
|
||||||
|
// It uses the same fallback logic as GetMetricDataRepo: cluster → wildcard → internal.
|
||||||
|
func GetHealthCheckRepo(cluster string) (MetricDataRepository, error) {
|
||||||
|
return GetMetricDataRepo(cluster, "")
|
||||||
|
}
|
||||||
|
|||||||
@@ -63,6 +63,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||||
|
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
|
||||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||||
"github.com/ClusterCockpit/cc-lib/v2/schema"
|
"github.com/ClusterCockpit/cc-lib/v2/schema"
|
||||||
)
|
)
|
||||||
@@ -653,6 +654,54 @@ func (ccms *CCMetricStore) LoadNodeListData(
|
|||||||
return data, nil
|
return data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HealthCheck queries the external cc-metric-store's health check endpoint.
|
||||||
|
// It sends a HealthCheckReq as the request body to /api/healthcheck and
|
||||||
|
// returns the per-node health check results.
|
||||||
|
func (ccms *CCMetricStore) HealthCheck(cluster string,
|
||||||
|
nodes []string, metrics []string,
|
||||||
|
) (map[string]metricstore.HealthCheckResult, error) {
|
||||||
|
req := metricstore.HealthCheckReq{
|
||||||
|
Cluster: cluster,
|
||||||
|
Nodes: nodes,
|
||||||
|
MetricNames: metrics,
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
if err := json.NewEncoder(buf).Encode(req); err != nil {
|
||||||
|
cclog.Errorf("Error while encoding health check request body: %s", err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
endpoint := fmt.Sprintf("%s/api/healthcheck", ccms.url)
|
||||||
|
httpReq, err := http.NewRequest(http.MethodGet, endpoint, buf)
|
||||||
|
if err != nil {
|
||||||
|
cclog.Errorf("Error while building health check request: %s", err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if ccms.jwt != "" {
|
||||||
|
httpReq.Header.Add("Authorization", fmt.Sprintf("Bearer %s", ccms.jwt))
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := ccms.client.Do(httpReq)
|
||||||
|
if err != nil {
|
||||||
|
cclog.Errorf("Error while performing health check request: %s", err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
|
||||||
|
if res.StatusCode != http.StatusOK {
|
||||||
|
return nil, fmt.Errorf("'%s': HTTP Status: %s", endpoint, res.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
var results map[string]metricstore.HealthCheckResult
|
||||||
|
if err := json.NewDecoder(bufio.NewReader(res.Body)).Decode(&results); err != nil {
|
||||||
|
cclog.Errorf("Error while decoding health check response: %s", err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return results, nil
|
||||||
|
}
|
||||||
|
|
||||||
// sanitizeStats replaces NaN values in statistics with 0 to enable JSON marshaling.
|
// sanitizeStats replaces NaN values in statistics with 0 to enable JSON marshaling.
|
||||||
// Regular float64 values cannot be JSONed when NaN.
|
// Regular float64 values cannot be JSONed when NaN.
|
||||||
func sanitizeStats(avg, min, max *schema.Float) {
|
func sanitizeStats(avg, min, max *schema.Float) {
|
||||||
|
|||||||
@@ -133,6 +133,12 @@ func (m *MemoryStore) GetHealthyMetrics(selector []string, expectedMetrics []str
|
|||||||
return degradedList, missingList, nil
|
return degradedList, missingList, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type HealthCheckReq struct {
|
||||||
|
Cluster string `json:"cluster" example:"fritz"`
|
||||||
|
Nodes []string `json:"nodes"`
|
||||||
|
MetricNames []string `json:"metric-names"`
|
||||||
|
}
|
||||||
|
|
||||||
// HealthCheck evaluates multiple nodes against a set of expected metrics
|
// HealthCheck evaluates multiple nodes against a set of expected metrics
|
||||||
// and returns a monitoring state per node.
|
// and returns a monitoring state per node.
|
||||||
//
|
//
|
||||||
|
|||||||
@@ -42,6 +42,13 @@ type InternalMetricStore struct{}
|
|||||||
|
|
||||||
var MetricStoreHandle *InternalMetricStore
|
var MetricStoreHandle *InternalMetricStore
|
||||||
|
|
||||||
|
// HealthCheck delegates to the internal MemoryStore's HealthCheck.
|
||||||
|
func (ccms *InternalMetricStore) HealthCheck(cluster string,
|
||||||
|
nodes []string, metrics []string,
|
||||||
|
) (map[string]HealthCheckResult, error) {
|
||||||
|
return GetMemoryStore().HealthCheck(cluster, nodes, metrics)
|
||||||
|
}
|
||||||
|
|
||||||
// TestLoadDataCallback allows tests to override LoadData behavior for testing purposes.
|
// TestLoadDataCallback allows tests to override LoadData behavior for testing purposes.
|
||||||
// When set to a non-nil function, LoadData will call this function instead of the default implementation.
|
// When set to a non-nil function, LoadData will call this function instead of the default implementation.
|
||||||
var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error)
|
var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error)
|
||||||
|
|||||||
Reference in New Issue
Block a user