From 2eeefc27201752bc3d0d4d6c256c8c4660c6c50d Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Mon, 16 Feb 2026 16:57:17 +0100 Subject: [PATCH 1/3] Add healthCheck support for external CCMS --- configs/config-demo.json | 7 +++ internal/api/node.go | 25 ++++++++-- internal/metricdispatch/metricdata.go | 11 +++++ internal/metricstoreclient/cc-metric-store.go | 49 +++++++++++++++++++ pkg/metricstore/healthcheck.go | 6 +++ pkg/metricstore/query.go | 7 +++ 6 files changed, 100 insertions(+), 5 deletions(-) diff --git a/configs/config-demo.json b/configs/config-demo.json index c3042993..51edb72c 100644 --- a/configs/config-demo.json +++ b/configs/config-demo.json @@ -12,6 +12,13 @@ "max-age": "2000h" } }, + "metric-store-external": [ + { + "scope": "fritz", + "url": "http://0.0.0.0:8082", + "token": "eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3NzU3Nzg4NDQsImlhdCI6MTc2ODU3ODg0NCwicm9sZXMiOlsiYWRtaW4iLCJhcGkiXSwic3ViIjoiZGVtbyJ9._SDEW9WaUVXSBFmWqGhyIZXLoqoDU8F1hkfh4cXKIqF4yw7w50IUpfUBtwUFUOnoviFKoi563f6RAMC7XxeLDA" + } + ], "metric-store": { "checkpoints": { "interval": "12h" diff --git a/internal/api/node.go b/internal/api/node.go index e6b19479..cab33452 100644 --- a/internal/api/node.go +++ b/internal/api/node.go @@ -12,6 +12,7 @@ import ( "strings" "time" + "github.com/ClusterCockpit/cc-backend/internal/metricdispatch" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/metricstore" @@ -77,29 +78,43 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { } requestReceived := time.Now().Unix() repo := repository.GetNodeRepository() - ms := metricstore.GetMemoryStore() m := make(map[string][]string) + metricNames := make(map[string][]string) healthResults := make(map[string]metricstore.HealthCheckResult) startMs := time.Now() + // Step 1: Build nodeList and metricList per subcluster for _, node := range req.Nodes { if sc, err := archive.GetSubClusterByNode(req.Cluster, node.Hostname); err == nil { m[sc] = append(m[sc], node.Hostname) } } - for sc, nl := range m { + for sc := range m { if sc != "" { metricList := archive.GetMetricConfigSubCluster(req.Cluster, sc) - metricNames := metricListToNames(metricList) - if results, err := ms.HealthCheck(req.Cluster, nl, metricNames); err == nil { - maps.Copy(healthResults, results) + metricNames[sc] = metricListToNames(metricList) + } + } + + // Step 2: Determine which metric store to query and perform health check + healthRepo, err := metricdispatch.GetHealthCheckRepo(req.Cluster) + if err != nil { + cclog.Warnf("updateNodeStates: no metric store for cluster %s, skipping health check: %v", req.Cluster, err) + } else { + for sc, nl := range m { + if sc != "" { + if results, err := healthRepo.HealthCheck(req.Cluster, nl, metricNames[sc]); err == nil { + maps.Copy(healthResults, results) + } } } } + fmt.Printf("Result: %#v\n", healthResults) + cclog.Debugf("Timer updateNodeStates, MemStore HealthCheck: %s", time.Since(startMs)) startDB := time.Now() diff --git a/internal/metricdispatch/metricdata.go b/internal/metricdispatch/metricdata.go index 9626ac86..36a10004 100755 --- a/internal/metricdispatch/metricdata.go +++ b/internal/metricdispatch/metricdata.go @@ -52,6 +52,11 @@ type MetricDataRepository interface { resolution int, from, to time.Time, ctx context.Context) (map[string]schema.JobData, error) + + // HealthCheck evaluates the monitoring state for a set of nodes against expected metrics. + HealthCheck(cluster string, + nodes []string, + metrics []string) (map[string]metricstore.HealthCheckResult, error) } type CCMetricStoreConfig struct { @@ -110,3 +115,9 @@ func GetMetricDataRepo(cluster string, subcluster string) (MetricDataRepository, return repo, nil } + +// GetHealthCheckRepo returns the MetricDataRepository for performing health checks on a cluster. +// It uses the same fallback logic as GetMetricDataRepo: cluster → wildcard → internal. +func GetHealthCheckRepo(cluster string) (MetricDataRepository, error) { + return GetMetricDataRepo(cluster, "") +} diff --git a/internal/metricstoreclient/cc-metric-store.go b/internal/metricstoreclient/cc-metric-store.go index 4472b825..81add789 100644 --- a/internal/metricstoreclient/cc-metric-store.go +++ b/internal/metricstoreclient/cc-metric-store.go @@ -63,6 +63,7 @@ import ( "time" "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-backend/pkg/metricstore" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" "github.com/ClusterCockpit/cc-lib/v2/schema" ) @@ -653,6 +654,54 @@ func (ccms *CCMetricStore) LoadNodeListData( return data, nil } +// HealthCheck queries the external cc-metric-store's health check endpoint. +// It sends a HealthCheckReq as the request body to /api/healthcheck and +// returns the per-node health check results. +func (ccms *CCMetricStore) HealthCheck(cluster string, + nodes []string, metrics []string, +) (map[string]metricstore.HealthCheckResult, error) { + req := metricstore.HealthCheckReq{ + Cluster: cluster, + Nodes: nodes, + MetricNames: metrics, + } + + buf := &bytes.Buffer{} + if err := json.NewEncoder(buf).Encode(req); err != nil { + cclog.Errorf("Error while encoding health check request body: %s", err.Error()) + return nil, err + } + + endpoint := fmt.Sprintf("%s/api/healthcheck", ccms.url) + httpReq, err := http.NewRequest(http.MethodGet, endpoint, buf) + if err != nil { + cclog.Errorf("Error while building health check request: %s", err.Error()) + return nil, err + } + if ccms.jwt != "" { + httpReq.Header.Add("Authorization", fmt.Sprintf("Bearer %s", ccms.jwt)) + } + + res, err := ccms.client.Do(httpReq) + if err != nil { + cclog.Errorf("Error while performing health check request: %s", err.Error()) + return nil, err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("'%s': HTTP Status: %s", endpoint, res.Status) + } + + var results map[string]metricstore.HealthCheckResult + if err := json.NewDecoder(bufio.NewReader(res.Body)).Decode(&results); err != nil { + cclog.Errorf("Error while decoding health check response: %s", err.Error()) + return nil, err + } + + return results, nil +} + // sanitizeStats replaces NaN values in statistics with 0 to enable JSON marshaling. // Regular float64 values cannot be JSONed when NaN. func sanitizeStats(avg, min, max *schema.Float) { diff --git a/pkg/metricstore/healthcheck.go b/pkg/metricstore/healthcheck.go index d6def692..73973ab0 100644 --- a/pkg/metricstore/healthcheck.go +++ b/pkg/metricstore/healthcheck.go @@ -133,6 +133,12 @@ func (m *MemoryStore) GetHealthyMetrics(selector []string, expectedMetrics []str return degradedList, missingList, nil } +type HealthCheckReq struct { + Cluster string `json:"cluster" example:"fritz"` + Nodes []string `json:"nodes"` + MetricNames []string `json:"metric-names"` +} + // HealthCheck evaluates multiple nodes against a set of expected metrics // and returns a monitoring state per node. // diff --git a/pkg/metricstore/query.go b/pkg/metricstore/query.go index 709a9710..7dce5dcd 100644 --- a/pkg/metricstore/query.go +++ b/pkg/metricstore/query.go @@ -42,6 +42,13 @@ type InternalMetricStore struct{} var MetricStoreHandle *InternalMetricStore +// HealthCheck delegates to the internal MemoryStore's HealthCheck. +func (ccms *InternalMetricStore) HealthCheck(cluster string, + nodes []string, metrics []string, +) (map[string]HealthCheckResult, error) { + return GetMemoryStore().HealthCheck(cluster, nodes, metrics) +} + // TestLoadDataCallback allows tests to override LoadData behavior for testing purposes. // When set to a non-nil function, LoadData will call this function instead of the default implementation. var TestLoadDataCallback func(job *schema.Job, metrics []string, scopes []schema.MetricScope, ctx context.Context, resolution int) (schema.JobData, error) From 1cf2c41bd703ac1a5e75f5ae1406fcca2afd1cce Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Mon, 16 Feb 2026 18:21:45 +0100 Subject: [PATCH 2/3] Resize the buffers and put them into the pool --- pkg/metricstore/buffer.go | 5 +++-- pkg/metricstore/level.go | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/metricstore/buffer.go b/pkg/metricstore/buffer.go index 46eb5149..665d8012 100644 --- a/pkg/metricstore/buffer.go +++ b/pkg/metricstore/buffer.go @@ -237,9 +237,10 @@ func (b *buffer) free(t int64) (delme bool, n int) { n += m if delme { b.prev.next = nil - if cap(b.prev.data) == BufferCap { - bufferPool.Put(b.prev) + if cap(b.prev.data) != BufferCap { + b.prev.data = make([]schema.Float, 0, BufferCap) } + bufferPool.Put(b.prev) b.prev = nil } } diff --git a/pkg/metricstore/level.go b/pkg/metricstore/level.go index bfa0ddf0..85c2ba7b 100644 --- a/pkg/metricstore/level.go +++ b/pkg/metricstore/level.go @@ -189,9 +189,10 @@ func (l *Level) free(t int64) (int, error) { delme, m := b.free(t) n += m if delme { - if cap(b.data) == BufferCap { - bufferPool.Put(b) + if cap(b.data) != BufferCap { + b.data = make([]schema.Float, 0, BufferCap) } + bufferPool.Put(b) l.metrics[i] = nil } } From 9af44779aa2ad5874128d70c4d5a0bf874167fd5 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 17 Feb 2026 08:13:50 +0100 Subject: [PATCH 3/3] Add error handling in tagger initialization --- internal/tagger/tagger.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/tagger/tagger.go b/internal/tagger/tagger.go index 067f16a9..bde3817d 100644 --- a/internal/tagger/tagger.go +++ b/internal/tagger/tagger.go @@ -51,10 +51,14 @@ func newTagger() { jobTagger.stopTaggers = append(jobTagger.stopTaggers, &JobClassTagger{}) for _, tagger := range jobTagger.startTaggers { - tagger.Register() + if err := tagger.Register(); err != nil { + cclog.Errorf("failed to register start tagger: %s", err) + } } for _, tagger := range jobTagger.stopTaggers { - tagger.Register() + if err := tagger.Register(); err != nil { + cclog.Errorf("failed to register stop tagger: %s", err) + } } }