Merge pull request #489 from ClusterCockpit/dev

Dev
This commit is contained in:
Jan Eitzinger
2026-02-17 08:22:10 +01:00
committed by GitHub
9 changed files with 112 additions and 11 deletions

View File

@@ -12,6 +12,13 @@
"max-age": "2000h"
}
},
"metric-store-external": [
{
"scope": "fritz",
"url": "http://0.0.0.0:8082",
"token": "eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3NzU3Nzg4NDQsImlhdCI6MTc2ODU3ODg0NCwicm9sZXMiOlsiYWRtaW4iLCJhcGkiXSwic3ViIjoiZGVtbyJ9._SDEW9WaUVXSBFmWqGhyIZXLoqoDU8F1hkfh4cXKIqF4yw7w50IUpfUBtwUFUOnoviFKoi563f6RAMC7XxeLDA"
}
],
"metric-store": {
"checkpoints": {
"interval": "12h"

View File

@@ -12,6 +12,7 @@ import (
"strings"
"time"
"github.com/ClusterCockpit/cc-backend/internal/metricdispatch"
"github.com/ClusterCockpit/cc-backend/internal/repository"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
@@ -77,29 +78,43 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
}
requestReceived := time.Now().Unix()
repo := repository.GetNodeRepository()
ms := metricstore.GetMemoryStore()
m := make(map[string][]string)
metricNames := make(map[string][]string)
healthResults := make(map[string]metricstore.HealthCheckResult)
startMs := time.Now()
// Step 1: Build nodeList and metricList per subcluster
for _, node := range req.Nodes {
if sc, err := archive.GetSubClusterByNode(req.Cluster, node.Hostname); err == nil {
m[sc] = append(m[sc], node.Hostname)
}
}
for sc, nl := range m {
for sc := range m {
if sc != "" {
metricList := archive.GetMetricConfigSubCluster(req.Cluster, sc)
metricNames := metricListToNames(metricList)
if results, err := ms.HealthCheck(req.Cluster, nl, metricNames); err == nil {
maps.Copy(healthResults, results)
metricNames[sc] = metricListToNames(metricList)
}
}
// Step 2: Determine which metric store to query and perform health check
healthRepo, err := metricdispatch.GetHealthCheckRepo(req.Cluster)
if err != nil {
cclog.Warnf("updateNodeStates: no metric store for cluster %s, skipping health check: %v", req.Cluster, err)
} else {
for sc, nl := range m {
if sc != "" {
if results, err := healthRepo.HealthCheck(req.Cluster, nl, metricNames[sc]); err == nil {
maps.Copy(healthResults, results)
}
}
}
}
fmt.Printf("Result: %#v\n", healthResults)
cclog.Debugf("Timer updateNodeStates, MemStore HealthCheck: %s", time.Since(startMs))
startDB := time.Now()

View File

@@ -52,6 +52,11 @@ type MetricDataRepository interface {
resolution int,
from, to time.Time,
ctx context.Context) (map[string]schema.JobData, error)
// HealthCheck evaluates the monitoring state for a set of nodes against expected metrics.
HealthCheck(cluster string,
nodes []string,
metrics []string) (map[string]metricstore.HealthCheckResult, error)
}
type CCMetricStoreConfig struct {
@@ -110,3 +115,9 @@ func GetMetricDataRepo(cluster string, subcluster string) (MetricDataRepository,
return repo, nil
}
// GetHealthCheckRepo returns the MetricDataRepository for performing health checks on a cluster.
// It uses the same fallback logic as GetMetricDataRepo: cluster → wildcard → internal.
func GetHealthCheckRepo(cluster string) (MetricDataRepository, error) {
return GetMetricDataRepo(cluster, "")
}

View File

@@ -63,6 +63,7 @@ import (
"time"
"github.com/ClusterCockpit/cc-backend/pkg/archive"
"github.com/ClusterCockpit/cc-backend/pkg/metricstore"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
"github.com/ClusterCockpit/cc-lib/v2/schema"
)
@@ -653,6 +654,54 @@ func (ccms *CCMetricStore) LoadNodeListData(
return data, nil
}
// HealthCheck queries the external cc-metric-store's health check endpoint.
// It sends a HealthCheckReq as the request body to /api/healthcheck and
// returns the per-node health check results.
func (ccms *CCMetricStore) HealthCheck(cluster string,
nodes []string, metrics []string,
) (map[string]metricstore.HealthCheckResult, error) {
req := metricstore.HealthCheckReq{
Cluster: cluster,
Nodes: nodes,
MetricNames: metrics,
}
buf := &bytes.Buffer{}
if err := json.NewEncoder(buf).Encode(req); err != nil {
cclog.Errorf("Error while encoding health check request body: %s", err.Error())
return nil, err
}
endpoint := fmt.Sprintf("%s/api/healthcheck", ccms.url)
httpReq, err := http.NewRequest(http.MethodGet, endpoint, buf)
if err != nil {
cclog.Errorf("Error while building health check request: %s", err.Error())
return nil, err
}
if ccms.jwt != "" {
httpReq.Header.Add("Authorization", fmt.Sprintf("Bearer %s", ccms.jwt))
}
res, err := ccms.client.Do(httpReq)
if err != nil {
cclog.Errorf("Error while performing health check request: %s", err.Error())
return nil, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("'%s': HTTP Status: %s", endpoint, res.Status)
}
var results map[string]metricstore.HealthCheckResult
if err := json.NewDecoder(bufio.NewReader(res.Body)).Decode(&results); err != nil {
cclog.Errorf("Error while decoding health check response: %s", err.Error())
return nil, err
}
return results, nil
}
// sanitizeStats replaces NaN values in statistics with 0 to enable JSON marshaling.
// Regular float64 values cannot be JSONed when NaN.
func sanitizeStats(avg, min, max *schema.Float) {

View File

@@ -51,10 +51,14 @@ func newTagger() {
jobTagger.stopTaggers = append(jobTagger.stopTaggers, &JobClassTagger{})
for _, tagger := range jobTagger.startTaggers {
tagger.Register()
if err := tagger.Register(); err != nil {
cclog.Errorf("failed to register start tagger: %s", err)
}
}
for _, tagger := range jobTagger.stopTaggers {
tagger.Register()
if err := tagger.Register(); err != nil {
cclog.Errorf("failed to register stop tagger: %s", err)
}
}
}

View File

@@ -237,9 +237,10 @@ func (b *buffer) free(t int64) (delme bool, n int) {
n += m
if delme {
b.prev.next = nil
if cap(b.prev.data) == BufferCap {
bufferPool.Put(b.prev)
if cap(b.prev.data) != BufferCap {
b.prev.data = make([]schema.Float, 0, BufferCap)
}
bufferPool.Put(b.prev)
b.prev = nil
}
}

View File

@@ -133,6 +133,12 @@ func (m *MemoryStore) GetHealthyMetrics(selector []string, expectedMetrics []str
return degradedList, missingList, nil
}
type HealthCheckReq struct {
Cluster string `json:"cluster" example:"fritz"`
Nodes []string `json:"nodes"`
MetricNames []string `json:"metric-names"`
}
// HealthCheck evaluates multiple nodes against a set of expected metrics
// and returns a monitoring state per node.
//

View File

@@ -189,9 +189,10 @@ func (l *Level) free(t int64) (int, error) {
delme, m := b.free(t)
n += m
if delme {
if cap(b.data) == BufferCap {
bufferPool.Put(b)
if cap(b.data) != BufferCap {
b.data = make([]schema.Float, 0, BufferCap)
}
bufferPool.Put(b)
l.metrics[i] = nil
}
}

View File

@@ -42,6 +42,13 @@ type InternalMetricStore struct{}
var MetricStoreHandle *InternalMetricStore
// HealthCheck delegates to the internal MemoryStore's HealthCheck.
func (ccms *InternalMetricStore) HealthCheck(cluster string,
nodes []string, metrics []string,
) (map[string]HealthCheckResult, error) {
return GetMemoryStore().HealthCheck(cluster, nodes, metrics)
}
// TestLoadDataCallback allows tests to override LoadData behavior for testing purposes.
// When set to a non-nil function, LoadData will call this function instead of the default implementation.
var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error)