From a71341064e1921ffb329cc12b2a7f3c8cafc44db Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Fri, 30 Jan 2026 23:24:16 +0100 Subject: [PATCH 01/17] Update to MetricStore HealthCheck API --- .gitignore | 2 +- internal/api/metricstore.go | 11 +++- pkg/metricstore/healthcheck.go | 102 +++++++++++++++++++++------------ 3 files changed, 77 insertions(+), 38 deletions(-) diff --git a/.gitignore b/.gitignore index db9f922b..67dbb510 100644 --- a/.gitignore +++ b/.gitignore @@ -13,7 +13,7 @@ /var/checkpoints* migrateTimestamps.pl -test_ccms_write_api* +test_ccms_* /web/frontend/public/build /web/frontend/node_modules diff --git a/internal/api/metricstore.go b/internal/api/metricstore.go index d36df4bf..d99222d2 100644 --- a/internal/api/metricstore.go +++ b/internal/api/metricstore.go @@ -151,6 +151,7 @@ func debugMetrics(rw http.ResponseWriter, r *http.Request) { // @router /healthcheck/ [get] func metricsHealth(rw http.ResponseWriter, r *http.Request) { rawCluster := r.URL.Query().Get("cluster") + rawSubCluster := r.URL.Query().Get("subcluster") rawNode := r.URL.Query().Get("node") if rawCluster == "" || rawNode == "" { @@ -163,8 +164,16 @@ func metricsHealth(rw http.ResponseWriter, r *http.Request) { selector := []string{rawCluster, rawNode} ms := metricstore.GetMemoryStore() - if err := ms.HealthCheck(bufio.NewWriter(rw), selector); err != nil { + response, err := ms.HealthCheck(selector, rawSubCluster) + if err != nil { handleError(err, http.StatusBadRequest, rw) return } + + jsonData, err := json.Marshal(response) + if err != nil { + cclog.Errorf("Error marshaling HealthCheckResponse JSON: %s", err) + } + + rw.Write(jsonData) } diff --git a/pkg/metricstore/healthcheck.go b/pkg/metricstore/healthcheck.go index 2a49c47a..3dbf661a 100644 --- a/pkg/metricstore/healthcheck.go +++ b/pkg/metricstore/healthcheck.go @@ -6,26 +6,34 @@ package metricstore import ( - "bufio" "fmt" "time" + + "github.com/ClusterCockpit/cc-lib/v2/schema" ) +type HeathCheckResponse struct { + Status schema.MonitoringState + Error error + list List +} + +type List struct { + StaleNodeMetricList []string + StaleHardwareMetricList map[string][]string + MissingNodeMetricList []string + MissingHardwareMetricList map[string][]string +} + // MaxMissingDataPoints is a threshold that allows a node to be healthy with certain number of data points missing. // Suppose a node does not receive last 5 data points, then healthCheck endpoint will still say a // node is healthy. Anything more than 5 missing points in metrics of the node will deem the node unhealthy. const MaxMissingDataPoints int64 = 5 -// MaxUnhealthyMetrics is a threshold which allows upto certain number of metrics in a node to be unhealthly. -// Works with MaxMissingDataPoints. Say 5 metrics (including submetrics) do not receive the last -// MaxMissingDataPoints data points, then the node will be deemed healthy. Any more metrics that does -// not receive data for MaxMissingDataPoints data points will deem the node unhealthy. -const MaxUnhealthyMetrics int64 = 5 - -func (b *buffer) healthCheck() int64 { +func (b *buffer) healthCheck() bool { // Check if the buffer is empty if b.data == nil { - return 1 + return true } bufferEnd := b.start + b.frequency*int64(len(b.data)) @@ -33,60 +41,82 @@ func (b *buffer) healthCheck() int64 { // Check if the buffer is too old if t-bufferEnd > MaxMissingDataPoints*b.frequency { - return 1 + return true } - return 0 + return false } -func (l *Level) healthCheck(m *MemoryStore, count int64) (int64, error) { +func (l *Level) healthCheck(m *MemoryStore) (List, error) { l.lock.RLock() defer l.lock.RUnlock() - for _, mc := range m.Metrics { + list := List{ + StaleNodeMetricList: make([]string, 0), + StaleHardwareMetricList: make(map[string][]string, 0), + MissingNodeMetricList: make([]string, 0), + MissingHardwareMetricList: make(map[string][]string, 0), + } + + for metricName, mc := range m.Metrics { if b := l.metrics[mc.offset]; b != nil { - count += b.healthCheck() + if b.healthCheck() { + list.StaleNodeMetricList = append(list.StaleNodeMetricList, metricName) + } + } else { + list.MissingNodeMetricList = append(list.MissingNodeMetricList, metricName) } } - for _, lvl := range l.children { - c, err := lvl.healthCheck(m, 0) + for hardwareMetricName, lvl := range l.children { + l, err := lvl.healthCheck(m) if err != nil { - return 0, err + return List{}, err + } + + if len(l.StaleNodeMetricList) != 0 { + list.StaleHardwareMetricList[hardwareMetricName] = l.StaleNodeMetricList + } + if len(l.MissingNodeMetricList) != 0 { + list.MissingHardwareMetricList[hardwareMetricName] = l.MissingNodeMetricList } - count += c } - return count, nil + return list, nil } -func (m *MemoryStore) HealthCheck(w *bufio.Writer, selector []string) error { +func (m *MemoryStore) HealthCheck(selector []string, subcluster string) (*HeathCheckResponse, error) { + response := HeathCheckResponse{ + Status: schema.MonitoringStateFull, + } + lvl := m.root.findLevel(selector) if lvl == nil { - return fmt.Errorf("[METRICSTORE]> not found: %#v", selector) + response.Status = schema.MonitoringStateFailed + response.Error = fmt.Errorf("[METRICSTORE]> error while HealthCheck, host not found: %#v", selector) + return &response, nil } - buf := make([]byte, 0, 25) - // buf = append(buf, "{"...) + var err error - var count int64 = 0 - - unhealthyMetricsCount, err := lvl.healthCheck(m, count) + response.list, err = lvl.healthCheck(m) if err != nil { - return err + return nil, err } - if unhealthyMetricsCount < MaxUnhealthyMetrics { - buf = append(buf, "Healthy"...) - } else { - buf = append(buf, "Unhealthy"...) + fmt.Printf("Response: %#v\n", response) + + if len(response.list.StaleNodeMetricList) != 0 || + len(response.list.StaleHardwareMetricList) != 0 { + response.Status = schema.MonitoringStatePartial + return &response, nil } - // buf = append(buf, "}\n"...) - - if _, err = w.Write(buf); err != nil { - return err + if len(response.list.MissingHardwareMetricList) != 0 || + len(response.list.MissingNodeMetricList) != 0 { + response.Status = schema.MonitoringStateFailed + return &response, nil } - return w.Flush() + return &response, nil } From 1791e665aa58e2768739c064c0e81c2bf1990a6b Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Sat, 31 Jan 2026 11:25:16 +0100 Subject: [PATCH 02/17] add default orders on list sorting change --- web/frontend/src/List.root.svelte | 44 ++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/web/frontend/src/List.root.svelte b/web/frontend/src/List.root.svelte index 108c42dd..7f13edf1 100644 --- a/web/frontend/src/List.root.svelte +++ b/web/frontend/src/List.root.svelte @@ -45,7 +45,7 @@ let filterComponent = $state(); // see why here: https://stackoverflow.com/questions/58287729/how-can-i-export-a-function-from-a-svelte-component-that-changes-a-value-in-the let jobFilters = $state([]); let nameFilter = $state(""); - let sorting = $state({ field: "totalJobs", direction: "down" }); + let sorting = $state({ field: "totalJobs", direction: "desc" }); /* Derived Vars */ let stats = $derived( @@ -67,21 +67,41 @@ ); /* Functions */ - function changeSorting(field) { - sorting = { field, direction: sorting?.direction == "down" ? "up" : "down" }; + function changeSorting(newField) { + if (sorting.field == newField) { + // Same Field, Change Direction + sorting = { field: newField, direction: sorting.direction == "desc" ? "asc" : "desc" }; + } else { + // Change Field, Apply Field Dependent Default + switch (newField) { + case "id": + case "name": + case "totalJobs": + case "totalWalltime": + sorting = { field: newField, direction: "desc" }; + break + case "totalCoreHours": + case "totalAccHours": + sorting = { field: newField, direction: "asc" }; + break + default: + // Fallback: Change only Field + sorting = { field: newField, direction: sorting.direction }; + } + } } function sort(stats, sorting, nameFilter) { - const idCmp = sorting.direction == "up" + const idCmp = sorting.direction == "asc" ? (a, b) => b.id.localeCompare(a.id) : (a, b) => a.id.localeCompare(b.id) // Force empty or undefined strings to the end of the list - const nameCmp = sorting.direction == "up" + const nameCmp = sorting.direction == "asc" ? (a, b) => !a?.name ? 1 : (!b?.name ? -1 : (b.name.localeCompare(a.name))) : (a, b) => !a?.name ? 1 : (!b?.name ? -1 : (a.name.localeCompare(b.name))) - const intCmp = sorting.direction == "up" + const intCmp = sorting.direction == "asc" ? (a, b) => a[sorting.field] - b[sorting.field] : (a, b) => b[sorting.field] - a[sorting.field]; @@ -141,7 +161,7 @@ > {#if sorting?.field == "id"} - + {:else} {/if} @@ -156,7 +176,7 @@ onclick={() => changeSorting("name")} > {#if sorting?.field == "name"} - + {:else} {/if} @@ -172,7 +192,7 @@ > {#if sorting?.field == "totalJobs"} - + {:else} {/if} @@ -186,7 +206,7 @@ onclick={() => changeSorting("totalWalltime")} > {#if sorting?.field == "totalWalltime"} - + {:else} {/if} @@ -200,7 +220,7 @@ onclick={() => changeSorting("totalCoreHours")} > {#if sorting?.field == "totalCoreHours"} - + {:else} {/if} @@ -214,7 +234,7 @@ onclick={() => changeSorting("totalAccHours")} > {#if sorting?.field == "totalAccHours"} - + {:else} {/if} From b7bd8210e5bbdb45ec1c6392f6a5c1a8051e8b67 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Sat, 31 Jan 2026 20:34:16 +0100 Subject: [PATCH 03/17] revert column defaults,keep general desc default order --- web/frontend/src/List.root.svelte | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/web/frontend/src/List.root.svelte b/web/frontend/src/List.root.svelte index 7f13edf1..eb81b4c7 100644 --- a/web/frontend/src/List.root.svelte +++ b/web/frontend/src/List.root.svelte @@ -72,22 +72,8 @@ // Same Field, Change Direction sorting = { field: newField, direction: sorting.direction == "desc" ? "asc" : "desc" }; } else { - // Change Field, Apply Field Dependent Default - switch (newField) { - case "id": - case "name": - case "totalJobs": - case "totalWalltime": - sorting = { field: newField, direction: "desc" }; - break - case "totalCoreHours": - case "totalAccHours": - sorting = { field: newField, direction: "asc" }; - break - default: - // Fallback: Change only Field - sorting = { field: newField, direction: sorting.direction }; - } + // Change Field, Apply Default Direction + sorting = { field: newField, direction: "desc" }; } } From f2285e603b0c951e2e9861869ee7012f5dc1e276 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Mon, 2 Feb 2026 10:38:05 +0100 Subject: [PATCH 04/17] fix disabled-false-positives, add info if no metrics selected --- web/frontend/src/generic/JobList.svelte | 4 +- .../src/generic/helper/JobFootprint.svelte | 2 +- .../src/generic/joblist/JobListRow.svelte | 72 ++++++++++--------- .../src/systems/nodelist/NodeListRow.svelte | 6 +- 4 files changed, 45 insertions(+), 39 deletions(-) diff --git a/web/frontend/src/generic/JobList.svelte b/web/frontend/src/generic/JobList.svelte index 17c8811f..0851abdc 100644 --- a/web/frontend/src/generic/JobList.svelte +++ b/web/frontend/src/generic/JobList.svelte @@ -32,7 +32,7 @@ let { matchedListJobs = $bindable(0), selectedJobs = $bindable([]), - metrics = getContext("cc-config").metricConfig_jobListMetrics, + metrics = [], sorting = { field: "startTime", type: "col", order: "DESC" }, showFootprint = false, filterBuffer = [], @@ -109,7 +109,7 @@ let paging = $derived({ itemsPerPage, page }); const plotWidth = $derived.by(() => { return Math.floor( - (tableWidth - jobInfoColumnWidth) / (metrics.length + (showFootprint ? 1 : 0)) - 10, + (tableWidth - jobInfoColumnWidth) / (metrics.length + (showFootprint ? 2 : 1)) - 10, ); }); let jobsStore = $derived(queryStore({ diff --git a/web/frontend/src/generic/helper/JobFootprint.svelte b/web/frontend/src/generic/helper/JobFootprint.svelte index 06fe2b73..9eaf3ff9 100644 --- a/web/frontend/src/generic/helper/JobFootprint.svelte +++ b/web/frontend/src/generic/helper/JobFootprint.svelte @@ -133,7 +133,7 @@ } - + {#if displayTitle} diff --git a/web/frontend/src/generic/joblist/JobListRow.svelte b/web/frontend/src/generic/joblist/JobListRow.svelte index 17a160e1..9502a2f8 100644 --- a/web/frontend/src/generic/joblist/JobListRow.svelte +++ b/web/frontend/src/generic/joblist/JobListRow.svelte @@ -79,6 +79,7 @@ /* Derived */ const jobId = $derived(job?.id); + const refinedData = $derived($metricsQuery?.data?.jobMetrics ? sortAndSelectScope($metricsQuery.data.jobMetrics) : []); const scopes = $derived.by(() => { if (job.numNodes == 1) { if (job.numAcc >= 1) return ["core", "accelerator"]; @@ -202,40 +203,45 @@ /> {/if} - {#each sortAndSelectScope($metricsQuery.data.jobMetrics) as metric, i (metric?.name || i)} + {#each refinedData as metric, i (metric?.name || i)} - - {#if metric.disabled == false && metric.data} - handleZoom(detail, metric.data.name)} - height={plotHeight} - timestep={metric.data.metric.timestep} - scope={metric.data.scope} - series={metric.data.metric.series} - statisticsSeries={metric.data.metric.statisticsSeries} - metric={metric.data.name} - cluster={cluster.find((c) => c.name == job.cluster)} - subCluster={job.subCluster} - isShared={job.shared != "none"} - numhwthreads={job.numHWThreads} - numaccs={job.numAcc} - zoomState={zoomStates[metric.data.name] || null} - thresholdState={thresholdStates[metric.data.name] || null} - /> - {:else if metric.disabled == true && metric.data} - Metric disabled for subcluster {metric.data.name}:{job.subCluster} - {:else} - -

No dataset(s) returned for {metrics[i]}

-

Metric or host was not found in metric store for cluster {job.cluster}:

-

Identical messages in {metrics[i]} column: Metric not found.

-

Identical messages in job {job.jobId} row: Host not found.

-
- {/if} + {#key metric} + {#if metric?.data} + {#if metric?.disabled} + + Metric {metric.data.name}: Disabled for subcluster {job.subCluster} + + {:else} + handleZoom(detail, metric.data.name)} + height={plotHeight} + timestep={metric.data.metric.timestep} + scope={metric.data.scope} + series={metric.data.metric.series} + statisticsSeries={metric.data.metric.statisticsSeries} + metric={metric.data.name} + cluster={cluster.find((c) => c.name == job.cluster)} + subCluster={job.subCluster} + isShared={job.shared != "none"} + numhwthreads={job.numHWThreads} + numaccs={job.numAcc} + zoomState={zoomStates[metric.data.name] || null} + thresholdState={thresholdStates[metric.data.name] || null} + /> + {/if} + {:else} + +

No dataset(s) returned for {metrics[i]}

+

Metric or host was not found in metric store for cluster {job.cluster}:

+

Identical messages in {metrics[i]} column: Metric not found.

+

Identical messages in job {job.jobId} row: Host not found.

+
+ {/if} + {/key} + + {:else} + + No metrics selected for display. {/each} {/if} diff --git a/web/frontend/src/systems/nodelist/NodeListRow.svelte b/web/frontend/src/systems/nodelist/NodeListRow.svelte index 6c542cba..34c3d3ba 100644 --- a/web/frontend/src/systems/nodelist/NodeListRow.svelte +++ b/web/frontend/src/systems/nodelist/NodeListRow.svelte @@ -69,9 +69,9 @@ }) ); - let extendedLegendData = $derived($nodeJobsData?.data ? buildExtendedLegend() : null); - let refinedData = $derived(nodeData?.metrics ? sortAndSelectScope(nodeData.metrics) : null); - let dataHealth = $derived(refinedData.filter((rd) => rd.disabled === false).map((enabled) => (enabled?.data?.metric?.series?.length > 0))); + const extendedLegendData = $derived($nodeJobsData?.data ? buildExtendedLegend() : null); + const refinedData = $derived(nodeData?.metrics ? sortAndSelectScope(nodeData.metrics) : []); + const dataHealth = $derived(refinedData.filter((rd) => rd.disabled === false).map((enabled) => (enabled?.data?.metric?.series?.length > 0))); /* Functions */ const selectScope = (nodeMetrics) => From 7b4e2fcf59ab4ec92c0626e96efbbd42d8a1d92e Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Mon, 2 Feb 2026 14:34:49 +0100 Subject: [PATCH 05/17] add isNan to clusterMetric aggregation --- internal/graph/schema.resolvers.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index d7d6b675..fd5facf5 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -928,7 +928,11 @@ func (r *queryResolver) ClusterMetrics(ctx context.Context, cluster string, metr // Sum if init'd and matching size if okData && len(ser.Data) == len(collectorData[metric]) { for i, val := range ser.Data { - collectorData[metric][i] += val + if val.IsNaN() { + continue + } else { + collectorData[metric][i] += val + } } } else if okData { cclog.Debugf("ClusterMetrics Skip Sum: Data Diff -> %s at %s; Want Size %d, Have Size %d", metric, ser.Hostname, len(collectorData[metric]), len(ser.Data)) From 13cca1ee62c8b800bfb5237501ba5aeae1136a81 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Mon, 2 Feb 2026 14:45:19 +0100 Subject: [PATCH 06/17] change log msg on clusterMetrics --- internal/graph/schema.resolvers.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index fd5facf5..19d04eab 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -923,7 +923,7 @@ func (r *queryResolver) ClusterMetrics(ctx context.Context, cluster string, metr if !okData && len(ser.Data) != 0 { collectorData[metric] = make([]schema.Float, len(ser.Data)) } else if !okData { - cclog.Debugf("ClusterMetrics Skip Init: No Data -> %s at %s; Size %d", metric, ser.Hostname, len(ser.Data)) + cclog.Debugf("[SCHEMARESOLVER] clusterMetrics skip init: no data -> %s at %s; size %d", metric, ser.Hostname, len(ser.Data)) } // Sum if init'd and matching size if okData && len(ser.Data) == len(collectorData[metric]) { @@ -935,7 +935,7 @@ func (r *queryResolver) ClusterMetrics(ctx context.Context, cluster string, metr } } } else if okData { - cclog.Debugf("ClusterMetrics Skip Sum: Data Diff -> %s at %s; Want Size %d, Have Size %d", metric, ser.Hostname, len(collectorData[metric]), len(ser.Data)) + cclog.Debugf("[SCHEMARESOLVER] clusterMetrics skip sum: data diff -> %s at %s; want size %d, have size %d", metric, ser.Hostname, len(collectorData[metric]), len(ser.Data)) } } } From e9cd6b42253444e4ea53e0105b1c9b37ac2368de Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Mon, 2 Feb 2026 17:51:41 +0100 Subject: [PATCH 07/17] set updateNodeStates timeStamp once per request -prevents per-host timestamp mismatches due to handler iteration duration --- internal/api/nats.go | 3 ++- internal/api/node.go | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/api/nats.go b/internal/api/nats.go index bbbd151f..c0a8c174 100644 --- a/internal/api/nats.go +++ b/internal/api/nats.go @@ -324,11 +324,12 @@ func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) { } repo := repository.GetNodeRepository() + requestReceived := time.Now().Unix() for _, node := range req.Nodes { state := determineState(node.States) nodeState := schema.NodeStateDB{ - TimeStamp: time.Now().Unix(), + TimeStamp: requestReceived, NodeState: state, CpusAllocated: node.CpusAllocated, MemoryAllocated: node.MemoryAllocated, diff --git a/internal/api/node.go b/internal/api/node.go index 4ad5337a..c3fe8492 100644 --- a/internal/api/node.go +++ b/internal/api/node.go @@ -63,11 +63,13 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { return } repo := repository.GetNodeRepository() + requestReceived := time.Now().Unix() for _, node := range req.Nodes { state := determineState(node.States) nodeState := schema.NodeStateDB{ - TimeStamp: time.Now().Unix(), NodeState: state, + TimeStamp: requestReceived, + NodeState: state, CpusAllocated: node.CpusAllocated, MemoryAllocated: node.MemoryAllocated, GpusAllocated: node.GpusAllocated, From 00a41373e84e36701bfedb9510e94cdadc9b1ba1 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 3 Feb 2026 12:23:24 +0100 Subject: [PATCH 08/17] Add monitoring healthstate support in nodestate API. --- internal/api/node.go | 40 +++- pkg/metricstore/healthcheck.go | 356 ++++++++++++++++++++++++++++ pkg/metricstore/metricstore_test.go | 217 +++++++++++++++++ 3 files changed, 611 insertions(+), 2 deletions(-) diff --git a/internal/api/node.go b/internal/api/node.go index c3fe8492..7039a06f 100644 --- a/internal/api/node.go +++ b/internal/api/node.go @@ -7,11 +7,14 @@ package api import ( "fmt" + "maps" "net/http" "strings" "time" "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-backend/pkg/metricstore" "github.com/ClusterCockpit/cc-lib/v2/schema" ) @@ -20,6 +23,15 @@ type UpdateNodeStatesRequest struct { Cluster string `json:"cluster" example:"fritz"` } +// metricListToNames converts a map of metric configurations to a list of metric names +func metricListToNames(metricList map[string]*schema.Metric) []string { + names := make([]string, 0, len(metricList)) + for name := range metricList { + names = append(names, name) + } + return names +} + // this routine assumes that only one of them exists per node func determineState(states []string) schema.SchedulerState { for _, state := range states { @@ -62,18 +74,42 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { http.StatusBadRequest, rw) return } - repo := repository.GetNodeRepository() requestReceived := time.Now().Unix() + repo := repository.GetNodeRepository() + ms := metricstore.GetMemoryStore() + + m := make(map[string][]string) + healthStates := make(map[string]metricstore.NodeHealthState) + + for _, node := range req.Nodes { + if sc, err := archive.GetSubClusterByNode(req.Cluster, node.Hostname); err == nil { + m[sc] = append(m[sc], node.Hostname) + } + } + + for sc, nl := range m { + if sc != "" { + metricList := archive.GetMetricConfigSubCluster(req.Cluster, sc) + metricNames := metricListToNames(metricList) + if states, err := ms.HealthCheckAlt(req.Cluster, nl, metricNames); err == nil { + maps.Copy(healthStates, states) + } + } + } for _, node := range req.Nodes { state := determineState(node.States) + healthState := schema.MonitoringStateFull + if hs, ok := healthStates[node.Hostname]; ok { + healthState = hs.Status + } nodeState := schema.NodeStateDB{ TimeStamp: requestReceived, NodeState: state, CpusAllocated: node.CpusAllocated, MemoryAllocated: node.MemoryAllocated, GpusAllocated: node.GpusAllocated, - HealthState: schema.MonitoringStateFull, + HealthState: healthState, JobsRunning: node.JobsRunning, } diff --git a/pkg/metricstore/healthcheck.go b/pkg/metricstore/healthcheck.go index 3dbf661a..a40394a3 100644 --- a/pkg/metricstore/healthcheck.go +++ b/pkg/metricstore/healthcheck.go @@ -47,6 +47,45 @@ func (b *buffer) healthCheck() bool { return false } +// healthCheck recursively examines a level and all its children to identify stale or missing metrics. +// +// This routine performs a two-phase check: +// +// Phase 1 - Check metrics at current level (node-level metrics): +// - Iterates through all configured metrics in m.Metrics +// - For each metric, checks if a buffer exists at l.metrics[mc.offset] +// - If buffer exists: calls buffer.healthCheck() to verify data freshness +// - Stale buffer (data older than MaxMissingDataPoints * frequency) → StaleNodeMetricList +// - Fresh buffer → healthy, no action +// - If buffer is nil: metric was never written → MissingNodeMetricList +// +// Phase 2 - Recursively check child levels (hardware-level metrics): +// - Iterates through l.children (e.g., "cpu0", "gpu0", "socket0") +// - Recursively calls healthCheck() on each child level +// - Aggregates child results into hardware-specific lists: +// - Child's StaleNodeMetricList → parent's StaleHardwareMetricList[childName] +// - Child's MissingNodeMetricList → parent's MissingHardwareMetricList[childName] +// +// The recursive nature means: +// - Calling on a host level checks: host metrics + all CPU/GPU/socket metrics +// - Calling on a socket level checks: socket metrics + all core metrics +// - Leaf levels (e.g., individual cores) only check their own metrics +// +// Parameters: +// - m: MemoryStore containing the global metric configuration (m.Metrics) +// +// Returns: +// - List: Categorized lists of stale and missing metrics at this level and below +// - error: Non-nil only for internal errors during recursion +// +// Concurrency: +// - Acquires read lock (RLock) to safely access l.metrics and l.children +// - Lock held for entire duration including recursive calls +// +// Example for host level with structure: host → [cpu0, cpu1]: +// - Checks host-level metrics (load, memory) → StaleNodeMetricList / MissingNodeMetricList +// - Recursively checks cpu0 metrics → results in StaleHardwareMetricList["cpu0"] +// - Recursively checks cpu1 metrics → results in StaleHardwareMetricList["cpu1"] func (l *Level) healthCheck(m *MemoryStore) (List, error) { l.lock.RLock() defer l.lock.RUnlock() @@ -58,6 +97,7 @@ func (l *Level) healthCheck(m *MemoryStore) (List, error) { MissingHardwareMetricList: make(map[string][]string, 0), } + // Phase 1: Check metrics at this level for metricName, mc := range m.Metrics { if b := l.metrics[mc.offset]; b != nil { if b.healthCheck() { @@ -68,6 +108,7 @@ func (l *Level) healthCheck(m *MemoryStore) (List, error) { } } + // Phase 2: Recursively check child levels (hardware components) for hardwareMetricName, lvl := range l.children { l, err := lvl.healthCheck(m) if err != nil { @@ -85,6 +126,48 @@ func (l *Level) healthCheck(m *MemoryStore) (List, error) { return list, nil } +// HealthCheck performs a health check on a specific node in the metric store. +// +// This routine checks whether metrics for a given node are being received and are up-to-date. +// It examines both node-level metrics (e.g., load, memory) and hardware-level metrics +// (e.g., CPU, GPU, network) to determine the monitoring state. +// +// Parameters: +// - selector: Hierarchical path to the target node, typically []string{cluster, hostname}. +// Example: []string{"emmy", "node001"} navigates to the "node001" host in the "emmy" cluster. +// The selector must match the hierarchy used during metric ingestion (see Level.findLevelOrCreate). +// - subcluster: Subcluster name (currently unused, reserved for future filtering) +// +// Returns: +// - *HeathCheckResponse: Health status with detailed lists of stale/missing metrics +// - error: Non-nil only for internal errors (not for unhealthy nodes) +// +// Health States: +// - MonitoringStateFull: All expected metrics are present and up-to-date +// - MonitoringStatePartial: Some metrics are stale (data older than MaxMissingDataPoints * frequency) +// - MonitoringStateFailed: Host not found, or metrics are completely missing +// +// The response includes detailed lists: +// - StaleNodeMetricList: Node-level metrics with stale data +// - StaleHardwareMetricList: Hardware-level metrics with stale data (grouped by component) +// - MissingNodeMetricList: Expected node-level metrics that have no data +// - MissingHardwareMetricList: Expected hardware-level metrics that have no data (grouped by component) +// +// Example usage: +// +// selector := []string{"emmy", "node001"} +// response, err := ms.HealthCheck(selector, "") +// if err != nil { +// // Internal error +// } +// switch response.Status { +// case schema.MonitoringStateFull: +// // All metrics healthy +// case schema.MonitoringStatePartial: +// // Check response.list.StaleNodeMetricList for details +// case schema.MonitoringStateFailed: +// // Check response.Error or response.list.MissingNodeMetricList +// } func (m *MemoryStore) HealthCheck(selector []string, subcluster string) (*HeathCheckResponse, error) { response := HeathCheckResponse{ Status: schema.MonitoringStateFull, @@ -120,3 +203,276 @@ func (m *MemoryStore) HealthCheck(selector []string, subcluster string) (*HeathC return &response, nil } + +// isBufferHealthy checks if a buffer has received data for the last MaxMissingDataPoints. +// +// Returns true if the buffer is healthy (recent data within threshold), false otherwise. +// A nil buffer or empty buffer is considered unhealthy. +func (b *buffer) isBufferHealthy() bool { + // Check if the buffer is empty + if b == nil || b.data == nil { + return false + } + + bufferEnd := b.start + b.frequency*int64(len(b.data)) + t := time.Now().Unix() + + // Check if the buffer has recent data (within MaxMissingDataPoints threshold) + if t-bufferEnd > MaxMissingDataPoints*b.frequency { + return false + } + + return true +} + +// countMissingValues counts the number of NaN (missing) values in the most recent data points. +// +// Examines the last MaxMissingDataPoints*2 values in the buffer and counts how many are NaN. +// We check twice the threshold to allow detecting when more than MaxMissingDataPoints are missing. +// If the buffer has fewer values, examines all available values. +// +// Returns: +// - int: Number of NaN values found in the examined range +func (b *buffer) countMissingValues() int { + if b == nil || b.data == nil || len(b.data) == 0 { + return 0 + } + + // Check twice the threshold to detect degraded metrics + checkCount := min(int(MaxMissingDataPoints)*2, len(b.data)) + + // Count NaN values in the most recent data points + missingCount := 0 + startIdx := len(b.data) - checkCount + for i := startIdx; i < len(b.data); i++ { + if b.data[i].IsNaN() { + missingCount++ + } + } + + return missingCount +} + +// getHealthyMetrics recursively collects healthy and degraded metrics at this level and below. +// +// A metric is considered: +// - Healthy: buffer has recent data within MaxMissingDataPoints threshold AND has few/no NaN values +// - Degraded: buffer exists and has recent data, but contains more than MaxMissingDataPoints NaN values +// +// This routine walks the entire subtree starting from the current level. +// +// Parameters: +// - m: MemoryStore containing the global metric configuration +// +// Returns: +// - []string: Flat list of healthy metric names from this level and all children +// - []string: Flat list of degraded metric names (exist but have too many missing values) +// - error: Non-nil only for internal errors during recursion +// +// The routine mirrors healthCheck() but provides more granular classification: +// - healthCheck() finds problems (stale/missing) +// - getHealthyMetrics() separates healthy from degraded metrics +func (l *Level) getHealthyMetrics(m *MemoryStore) ([]string, []string, error) { + l.lock.RLock() + defer l.lock.RUnlock() + + healthyList := make([]string, 0) + degradedList := make([]string, 0) + + // Phase 1: Check metrics at this level + for metricName, mc := range m.Metrics { + b := l.metrics[mc.offset] + if b.isBufferHealthy() { + // Buffer has recent data, now check for missing values + missingCount := b.countMissingValues() + if missingCount > int(MaxMissingDataPoints) { + degradedList = append(degradedList, metricName) + } else { + healthyList = append(healthyList, metricName) + } + } + } + + // Phase 2: Recursively check child levels (hardware components) + for _, lvl := range l.children { + childHealthy, childDegraded, err := lvl.getHealthyMetrics(m) + if err != nil { + return nil, nil, err + } + + // Merge child metrics into flat lists + healthyList = append(healthyList, childHealthy...) + degradedList = append(degradedList, childDegraded...) + } + + return healthyList, degradedList, nil +} + +// GetHealthyMetrics returns healthy and degraded metrics for a specific node as flat lists. +// +// This routine walks the metric tree starting from the specified node selector +// and collects all metrics that have received data within the last MaxMissingDataPoints +// (default: 5 data points). Metrics are classified into two categories: +// +// - Healthy: Buffer has recent data AND contains few/no NaN (missing) values +// - Degraded: Buffer has recent data BUT contains more than MaxMissingDataPoints NaN values +// +// The returned lists include both node-level metrics (e.g., "load", "mem_used") and +// hardware-level metrics (e.g., "cpu_user", "gpu_temp") in flat slices. +// +// Parameters: +// - selector: Hierarchical path to the target node, typically []string{cluster, hostname}. +// Example: []string{"emmy", "node001"} navigates to the "node001" host in the "emmy" cluster. +// The selector must match the hierarchy used during metric ingestion. +// +// Returns: +// - []string: Flat list of healthy metric names (recent data, few missing values) +// - []string: Flat list of degraded metric names (recent data, many missing values) +// - error: Non-nil if the node is not found or internal errors occur +// +// Example usage: +// +// selector := []string{"emmy", "node001"} +// healthyMetrics, degradedMetrics, err := ms.GetHealthyMetrics(selector) +// if err != nil { +// // Node not found or internal error +// return err +// } +// fmt.Printf("Healthy metrics: %v\n", healthyMetrics) +// // Output: ["load", "mem_used", "cpu_user", ...] +// fmt.Printf("Degraded metrics: %v\n", degradedMetrics) +// // Output: ["gpu_temp", "network_rx", ...] (metrics with many NaN values) +// +// Note: This routine provides more granular classification than HealthCheck: +// - HealthCheck reports stale/missing metrics (problems) +// - GetHealthyMetrics separates fully healthy from degraded metrics (quality levels) +func (m *MemoryStore) GetHealthyMetrics(selector []string) ([]string, []string, error) { + lvl := m.root.findLevel(selector) + if lvl == nil { + return nil, nil, fmt.Errorf("[METRICSTORE]> error while GetHealthyMetrics, host not found: %#v", selector) + } + + healthyList, degradedList, err := lvl.getHealthyMetrics(m) + if err != nil { + return nil, nil, err + } + + return healthyList, degradedList, nil +} + +// NodeHealthState represents the health status of a single node's metrics. +type NodeHealthState struct { + Status schema.MonitoringState // Overall health status: Full, Partial, or Failed + HealthyMetrics []string // Metrics with recent data and few missing values + DegradedMetrics []string // Metrics with recent data but many missing values + MissingMetrics []string // Expected metrics that are completely missing or stale +} + +// HealthCheckAlt performs health checks on multiple nodes and returns their health states. +// +// This routine provides a batch health check interface that evaluates multiple nodes +// against a specific set of expected metrics. For each node, it determines which metrics +// are healthy, degraded, or missing, and assigns an overall health status. +// +// Health Status Classification: +// - MonitoringStateFull: All expected metrics are healthy (recent data, few missing values) +// - MonitoringStatePartial: Some metrics are degraded (many missing values) or missing +// - MonitoringStateFailed: Node not found or all expected metrics are missing/stale +// +// Parameters: +// - cluster: Cluster name (first element of selector path) +// - nodes: List of node hostnames to check +// - expectedMetrics: List of metric names that should be present on each node +// +// Returns: +// - map[string]NodeHealthState: Map keyed by hostname containing health state for each node +// - error: Non-nil only for internal errors (individual node failures are captured in NodeHealthState) +// +// Example usage: +// +// cluster := "emmy" +// nodes := []string{"node001", "node002", "node003"} +// expectedMetrics := []string{"load", "mem_used", "cpu_user", "cpu_system"} +// healthStates, err := ms.HealthCheckAlt(cluster, nodes, expectedMetrics) +// if err != nil { +// return err +// } +// for hostname, state := range healthStates { +// fmt.Printf("Node %s: %s\n", hostname, state.Status) +// fmt.Printf(" Healthy: %v\n", state.HealthyMetrics) +// fmt.Printf(" Degraded: %v\n", state.DegradedMetrics) +// fmt.Printf(" Missing: %v\n", state.MissingMetrics) +// } +// +// Note: This routine is optimized for batch operations where you need to check +// the same set of metrics across multiple nodes. For single-node checks with +// all configured metrics, use HealthCheck() instead. +func (m *MemoryStore) HealthCheckAlt(cluster string, + nodes []string, expectedMetrics []string, +) (map[string]NodeHealthState, error) { + results := make(map[string]NodeHealthState, len(nodes)) + + // Create a set of expected metrics for fast lookup + expectedSet := make(map[string]bool, len(expectedMetrics)) + for _, metric := range expectedMetrics { + expectedSet[metric] = true + } + + // Check each node + for _, hostname := range nodes { + selector := []string{cluster, hostname} + state := NodeHealthState{ + Status: schema.MonitoringStateFull, + HealthyMetrics: make([]string, 0), + DegradedMetrics: make([]string, 0), + MissingMetrics: make([]string, 0), + } + + // Get healthy and degraded metrics for this node + healthyList, degradedList, err := m.GetHealthyMetrics(selector) + if err != nil { + // Node not found or internal error + state.Status = schema.MonitoringStateFailed + state.MissingMetrics = expectedMetrics + results[hostname] = state + continue + } + + // Create sets for fast lookup + healthySet := make(map[string]bool, len(healthyList)) + for _, metric := range healthyList { + healthySet[metric] = true + } + degradedSet := make(map[string]bool, len(degradedList)) + for _, metric := range degradedList { + degradedSet[metric] = true + } + + // Classify each expected metric + for _, metric := range expectedMetrics { + if healthySet[metric] { + state.HealthyMetrics = append(state.HealthyMetrics, metric) + } else if degradedSet[metric] { + state.DegradedMetrics = append(state.DegradedMetrics, metric) + } else { + state.MissingMetrics = append(state.MissingMetrics, metric) + } + } + + // Determine overall health status + if len(state.MissingMetrics) > 0 || len(state.DegradedMetrics) > 0 { + if len(state.HealthyMetrics) == 0 { + // No healthy metrics at all + state.Status = schema.MonitoringStateFailed + } else { + // Some healthy, some degraded/missing + state.Status = schema.MonitoringStatePartial + } + } + // else: all metrics healthy, status remains MonitoringStateFull + + results[hostname] = state + } + + return results, nil +} diff --git a/pkg/metricstore/metricstore_test.go b/pkg/metricstore/metricstore_test.go index 90cec2bd..70ef73f8 100644 --- a/pkg/metricstore/metricstore_test.go +++ b/pkg/metricstore/metricstore_test.go @@ -7,6 +7,7 @@ package metricstore import ( "testing" + "time" "github.com/ClusterCockpit/cc-lib/v2/schema" ) @@ -88,3 +89,219 @@ func TestBufferRead(t *testing.T) { t.Errorf("buffer.read() len(result) = %d, want 3", len(result)) } } + +func TestHealthCheckAlt(t *testing.T) { + // Create a test MemoryStore with some metrics + metrics := map[string]MetricConfig{ + "load": {Frequency: 10, Aggregation: AvgAggregation, offset: 0}, + "mem_used": {Frequency: 10, Aggregation: AvgAggregation, offset: 1}, + "cpu_user": {Frequency: 10, Aggregation: AvgAggregation, offset: 2}, + "cpu_system": {Frequency: 10, Aggregation: AvgAggregation, offset: 3}, + } + + ms := &MemoryStore{ + Metrics: metrics, + root: Level{ + metrics: make([]*buffer, len(metrics)), + children: make(map[string]*Level), + }, + } + + // Use recent timestamps (current time minus a small offset) + now := time.Now().Unix() + startTime := now - 100 // Start 100 seconds ago to have enough data points + + // Setup test data for node001 - all metrics healthy + node001 := ms.root.findLevelOrCreate([]string{"testcluster", "node001"}, len(metrics)) + for i := 0; i < len(metrics); i++ { + node001.metrics[i] = newBuffer(startTime, 10) + // Write recent data with no NaN values + for ts := startTime; ts <= now; ts += 10 { + node001.metrics[i].write(ts, schema.Float(float64(i+1))) + } + } + + // Setup test data for node002 - some metrics degraded (many NaN values) + node002 := ms.root.findLevelOrCreate([]string{"testcluster", "node002"}, len(metrics)) + for i := 0; i < len(metrics); i++ { + node002.metrics[i] = newBuffer(startTime, 10) + if i < 2 { + // First two metrics: healthy (no NaN) + for ts := startTime; ts <= now; ts += 10 { + node002.metrics[i].write(ts, schema.Float(float64(i+1))) + } + } else { + // Last two metrics: degraded (many NaN values in recent data) + // Write real values first, then NaN values at the end + count := 0 + for ts := startTime; ts <= now; ts += 10 { + if count < 5 { + // Write first 5 real values + node002.metrics[i].write(ts, schema.Float(float64(i+1))) + } else { + // Write NaN for the rest (last ~6 values will be NaN) + node002.metrics[i].write(ts, schema.NaN) + } + count++ + } + } + } + + // Setup test data for node003 - some metrics missing (no buffer) + node003 := ms.root.findLevelOrCreate([]string{"testcluster", "node003"}, len(metrics)) + // Only create buffers for first two metrics + for i := 0; i < 2; i++ { + node003.metrics[i] = newBuffer(startTime, 10) + for ts := startTime; ts <= now; ts += 10 { + node003.metrics[i].write(ts, schema.Float(float64(i+1))) + } + } + // Leave metrics[2] and metrics[3] as nil (missing) + + // node004 doesn't exist at all + + tests := []struct { + name string + cluster string + nodes []string + expectedMetrics []string + wantStates map[string]schema.MonitoringState + wantHealthyCounts map[string]int + wantDegradedCounts map[string]int + wantMissingCounts map[string]int + }{ + { + name: "all metrics healthy", + cluster: "testcluster", + nodes: []string{"node001"}, + expectedMetrics: []string{"load", "mem_used", "cpu_user", "cpu_system"}, + wantStates: map[string]schema.MonitoringState{ + "node001": schema.MonitoringStateFull, + }, + wantHealthyCounts: map[string]int{"node001": 4}, + wantDegradedCounts: map[string]int{"node001": 0}, + wantMissingCounts: map[string]int{"node001": 0}, + }, + { + name: "some metrics degraded", + cluster: "testcluster", + nodes: []string{"node002"}, + expectedMetrics: []string{"load", "mem_used", "cpu_user", "cpu_system"}, + wantStates: map[string]schema.MonitoringState{ + "node002": schema.MonitoringStatePartial, + }, + wantHealthyCounts: map[string]int{"node002": 2}, + wantDegradedCounts: map[string]int{"node002": 2}, + wantMissingCounts: map[string]int{"node002": 0}, + }, + { + name: "some metrics missing", + cluster: "testcluster", + nodes: []string{"node003"}, + expectedMetrics: []string{"load", "mem_used", "cpu_user", "cpu_system"}, + wantStates: map[string]schema.MonitoringState{ + "node003": schema.MonitoringStatePartial, + }, + wantHealthyCounts: map[string]int{"node003": 2}, + wantDegradedCounts: map[string]int{"node003": 0}, + wantMissingCounts: map[string]int{"node003": 2}, + }, + { + name: "node not found", + cluster: "testcluster", + nodes: []string{"node004"}, + expectedMetrics: []string{"load", "mem_used", "cpu_user", "cpu_system"}, + wantStates: map[string]schema.MonitoringState{ + "node004": schema.MonitoringStateFailed, + }, + wantHealthyCounts: map[string]int{"node004": 0}, + wantDegradedCounts: map[string]int{"node004": 0}, + wantMissingCounts: map[string]int{"node004": 4}, + }, + { + name: "multiple nodes mixed states", + cluster: "testcluster", + nodes: []string{"node001", "node002", "node003", "node004"}, + expectedMetrics: []string{"load", "mem_used"}, + wantStates: map[string]schema.MonitoringState{ + "node001": schema.MonitoringStateFull, + "node002": schema.MonitoringStateFull, + "node003": schema.MonitoringStateFull, + "node004": schema.MonitoringStateFailed, + }, + wantHealthyCounts: map[string]int{ + "node001": 2, + "node002": 2, + "node003": 2, + "node004": 0, + }, + wantDegradedCounts: map[string]int{ + "node001": 0, + "node002": 0, + "node003": 0, + "node004": 0, + }, + wantMissingCounts: map[string]int{ + "node001": 0, + "node002": 0, + "node003": 0, + "node004": 2, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + results, err := ms.HealthCheckAlt(tt.cluster, tt.nodes, tt.expectedMetrics) + if err != nil { + t.Errorf("HealthCheckAlt() error = %v", err) + return + } + + // Check that we got results for all nodes + if len(results) != len(tt.nodes) { + t.Errorf("HealthCheckAlt() returned %d results, want %d", len(results), len(tt.nodes)) + } + + // Check each node's state + for _, node := range tt.nodes { + state, ok := results[node] + if !ok { + t.Errorf("HealthCheckAlt() missing result for node %s", node) + continue + } + + // Check status + if wantStatus, ok := tt.wantStates[node]; ok { + if state.Status != wantStatus { + t.Errorf("HealthCheckAlt() node %s status = %v, want %v", node, state.Status, wantStatus) + } + } + + // Check healthy count + if wantCount, ok := tt.wantHealthyCounts[node]; ok { + if len(state.HealthyMetrics) != wantCount { + t.Errorf("HealthCheckAlt() node %s healthy count = %d, want %d (metrics: %v)", + node, len(state.HealthyMetrics), wantCount, state.HealthyMetrics) + } + } + + // Check degraded count + if wantCount, ok := tt.wantDegradedCounts[node]; ok { + if len(state.DegradedMetrics) != wantCount { + t.Errorf("HealthCheckAlt() node %s degraded count = %d, want %d (metrics: %v)", + node, len(state.DegradedMetrics), wantCount, state.DegradedMetrics) + } + } + + // Check missing count + if wantCount, ok := tt.wantMissingCounts[node]; ok { + if len(state.MissingMetrics) != wantCount { + t.Errorf("HealthCheckAlt() node %s missing count = %d, want %d (metrics: %v)", + node, len(state.MissingMetrics), wantCount, state.MissingMetrics) + } + } + } + }) + } +} From d39b955b25a9d08f52ea05451a3ef0dd99360147 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Tue, 3 Feb 2026 14:34:53 +0100 Subject: [PATCH 09/17] fix doubleMetric xlegend label --- web/frontend/src/generic/plots/DoubleMetricPlot.svelte | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/frontend/src/generic/plots/DoubleMetricPlot.svelte b/web/frontend/src/generic/plots/DoubleMetricPlot.svelte index 10e01311..f3cea881 100644 --- a/web/frontend/src/generic/plots/DoubleMetricPlot.svelte +++ b/web/frontend/src/generic/plots/DoubleMetricPlot.svelte @@ -79,7 +79,7 @@ // X let pendingSeries = [ { - label: "Runtime", + label: "Time", value: (u, ts, sidx, didx) => (didx == null) ? null : formatDurationTime(ts, forNode), } From 248f11f4f85b3ddd7b81a444bef080bc9935e0ab Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 3 Feb 2026 14:55:12 +0100 Subject: [PATCH 10/17] Change API of Node HealthState --- internal/api/node.go | 4 +- pkg/metricstore/healthcheck.go | 57 +++++++++-------------- pkg/metricstore/metricstore_test.go | 71 +++-------------------------- 3 files changed, 30 insertions(+), 102 deletions(-) diff --git a/internal/api/node.go b/internal/api/node.go index 7039a06f..853b23e1 100644 --- a/internal/api/node.go +++ b/internal/api/node.go @@ -79,7 +79,7 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { ms := metricstore.GetMemoryStore() m := make(map[string][]string) - healthStates := make(map[string]metricstore.NodeHealthState) + healthStates := make(map[string]schema.MonitoringState) for _, node := range req.Nodes { if sc, err := archive.GetSubClusterByNode(req.Cluster, node.Hostname); err == nil { @@ -101,7 +101,7 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { state := determineState(node.States) healthState := schema.MonitoringStateFull if hs, ok := healthStates[node.Hostname]; ok { - healthState = hs.Status + healthState = hs } nodeState := schema.NodeStateDB{ TimeStamp: requestReceived, diff --git a/pkg/metricstore/healthcheck.go b/pkg/metricstore/healthcheck.go index a40394a3..5ab26466 100644 --- a/pkg/metricstore/healthcheck.go +++ b/pkg/metricstore/healthcheck.go @@ -360,19 +360,11 @@ func (m *MemoryStore) GetHealthyMetrics(selector []string) ([]string, []string, return healthyList, degradedList, nil } -// NodeHealthState represents the health status of a single node's metrics. -type NodeHealthState struct { - Status schema.MonitoringState // Overall health status: Full, Partial, or Failed - HealthyMetrics []string // Metrics with recent data and few missing values - DegradedMetrics []string // Metrics with recent data but many missing values - MissingMetrics []string // Expected metrics that are completely missing or stale -} - -// HealthCheckAlt performs health checks on multiple nodes and returns their health states. +// HealthCheckAlt performs health checks on multiple nodes and returns their monitoring states. // // This routine provides a batch health check interface that evaluates multiple nodes -// against a specific set of expected metrics. For each node, it determines which metrics -// are healthy, degraded, or missing, and assigns an overall health status. +// against a specific set of expected metrics. For each node, it determines the overall +// monitoring state based on which metrics are healthy, degraded, or missing. // // Health Status Classification: // - MonitoringStateFull: All expected metrics are healthy (recent data, few missing values) @@ -385,8 +377,8 @@ type NodeHealthState struct { // - expectedMetrics: List of metric names that should be present on each node // // Returns: -// - map[string]NodeHealthState: Map keyed by hostname containing health state for each node -// - error: Non-nil only for internal errors (individual node failures are captured in NodeHealthState) +// - map[string]schema.MonitoringState: Map keyed by hostname containing monitoring state for each node +// - error: Non-nil only for internal errors (individual node failures are captured as MonitoringStateFailed) // // Example usage: // @@ -398,10 +390,7 @@ type NodeHealthState struct { // return err // } // for hostname, state := range healthStates { -// fmt.Printf("Node %s: %s\n", hostname, state.Status) -// fmt.Printf(" Healthy: %v\n", state.HealthyMetrics) -// fmt.Printf(" Degraded: %v\n", state.DegradedMetrics) -// fmt.Printf(" Missing: %v\n", state.MissingMetrics) +// fmt.Printf("Node %s: %s\n", hostname, state) // } // // Note: This routine is optimized for batch operations where you need to check @@ -409,8 +398,8 @@ type NodeHealthState struct { // all configured metrics, use HealthCheck() instead. func (m *MemoryStore) HealthCheckAlt(cluster string, nodes []string, expectedMetrics []string, -) (map[string]NodeHealthState, error) { - results := make(map[string]NodeHealthState, len(nodes)) +) (map[string]schema.MonitoringState, error) { + results := make(map[string]schema.MonitoringState, len(nodes)) // Create a set of expected metrics for fast lookup expectedSet := make(map[string]bool, len(expectedMetrics)) @@ -421,20 +410,16 @@ func (m *MemoryStore) HealthCheckAlt(cluster string, // Check each node for _, hostname := range nodes { selector := []string{cluster, hostname} - state := NodeHealthState{ - Status: schema.MonitoringStateFull, - HealthyMetrics: make([]string, 0), - DegradedMetrics: make([]string, 0), - MissingMetrics: make([]string, 0), - } + status := schema.MonitoringStateFull + healthyCount := 0 + degradedCount := 0 + missingCount := 0 // Get healthy and degraded metrics for this node healthyList, degradedList, err := m.GetHealthyMetrics(selector) if err != nil { // Node not found or internal error - state.Status = schema.MonitoringStateFailed - state.MissingMetrics = expectedMetrics - results[hostname] = state + results[hostname] = schema.MonitoringStateFailed continue } @@ -451,27 +436,27 @@ func (m *MemoryStore) HealthCheckAlt(cluster string, // Classify each expected metric for _, metric := range expectedMetrics { if healthySet[metric] { - state.HealthyMetrics = append(state.HealthyMetrics, metric) + healthyCount++ } else if degradedSet[metric] { - state.DegradedMetrics = append(state.DegradedMetrics, metric) + degradedCount++ } else { - state.MissingMetrics = append(state.MissingMetrics, metric) + missingCount++ } } // Determine overall health status - if len(state.MissingMetrics) > 0 || len(state.DegradedMetrics) > 0 { - if len(state.HealthyMetrics) == 0 { + if missingCount > 0 || degradedCount > 0 { + if healthyCount == 0 { // No healthy metrics at all - state.Status = schema.MonitoringStateFailed + status = schema.MonitoringStateFailed } else { // Some healthy, some degraded/missing - state.Status = schema.MonitoringStatePartial + status = schema.MonitoringStatePartial } } // else: all metrics healthy, status remains MonitoringStateFull - results[hostname] = state + results[hostname] = status } return results, nil diff --git a/pkg/metricstore/metricstore_test.go b/pkg/metricstore/metricstore_test.go index 70ef73f8..e0fcfea5 100644 --- a/pkg/metricstore/metricstore_test.go +++ b/pkg/metricstore/metricstore_test.go @@ -161,14 +161,11 @@ func TestHealthCheckAlt(t *testing.T) { // node004 doesn't exist at all tests := []struct { - name string - cluster string - nodes []string - expectedMetrics []string - wantStates map[string]schema.MonitoringState - wantHealthyCounts map[string]int - wantDegradedCounts map[string]int - wantMissingCounts map[string]int + name string + cluster string + nodes []string + expectedMetrics []string + wantStates map[string]schema.MonitoringState }{ { name: "all metrics healthy", @@ -178,9 +175,6 @@ func TestHealthCheckAlt(t *testing.T) { wantStates: map[string]schema.MonitoringState{ "node001": schema.MonitoringStateFull, }, - wantHealthyCounts: map[string]int{"node001": 4}, - wantDegradedCounts: map[string]int{"node001": 0}, - wantMissingCounts: map[string]int{"node001": 0}, }, { name: "some metrics degraded", @@ -190,9 +184,6 @@ func TestHealthCheckAlt(t *testing.T) { wantStates: map[string]schema.MonitoringState{ "node002": schema.MonitoringStatePartial, }, - wantHealthyCounts: map[string]int{"node002": 2}, - wantDegradedCounts: map[string]int{"node002": 2}, - wantMissingCounts: map[string]int{"node002": 0}, }, { name: "some metrics missing", @@ -202,9 +193,6 @@ func TestHealthCheckAlt(t *testing.T) { wantStates: map[string]schema.MonitoringState{ "node003": schema.MonitoringStatePartial, }, - wantHealthyCounts: map[string]int{"node003": 2}, - wantDegradedCounts: map[string]int{"node003": 0}, - wantMissingCounts: map[string]int{"node003": 2}, }, { name: "node not found", @@ -214,9 +202,6 @@ func TestHealthCheckAlt(t *testing.T) { wantStates: map[string]schema.MonitoringState{ "node004": schema.MonitoringStateFailed, }, - wantHealthyCounts: map[string]int{"node004": 0}, - wantDegradedCounts: map[string]int{"node004": 0}, - wantMissingCounts: map[string]int{"node004": 4}, }, { name: "multiple nodes mixed states", @@ -229,24 +214,6 @@ func TestHealthCheckAlt(t *testing.T) { "node003": schema.MonitoringStateFull, "node004": schema.MonitoringStateFailed, }, - wantHealthyCounts: map[string]int{ - "node001": 2, - "node002": 2, - "node003": 2, - "node004": 0, - }, - wantDegradedCounts: map[string]int{ - "node001": 0, - "node002": 0, - "node003": 0, - "node004": 0, - }, - wantMissingCounts: map[string]int{ - "node001": 0, - "node002": 0, - "node003": 0, - "node004": 2, - }, }, } @@ -273,32 +240,8 @@ func TestHealthCheckAlt(t *testing.T) { // Check status if wantStatus, ok := tt.wantStates[node]; ok { - if state.Status != wantStatus { - t.Errorf("HealthCheckAlt() node %s status = %v, want %v", node, state.Status, wantStatus) - } - } - - // Check healthy count - if wantCount, ok := tt.wantHealthyCounts[node]; ok { - if len(state.HealthyMetrics) != wantCount { - t.Errorf("HealthCheckAlt() node %s healthy count = %d, want %d (metrics: %v)", - node, len(state.HealthyMetrics), wantCount, state.HealthyMetrics) - } - } - - // Check degraded count - if wantCount, ok := tt.wantDegradedCounts[node]; ok { - if len(state.DegradedMetrics) != wantCount { - t.Errorf("HealthCheckAlt() node %s degraded count = %d, want %d (metrics: %v)", - node, len(state.DegradedMetrics), wantCount, state.DegradedMetrics) - } - } - - // Check missing count - if wantCount, ok := tt.wantMissingCounts[node]; ok { - if len(state.MissingMetrics) != wantCount { - t.Errorf("HealthCheckAlt() node %s missing count = %d, want %d (metrics: %v)", - node, len(state.MissingMetrics), wantCount, state.MissingMetrics) + if state != wantStatus { + t.Errorf("HealthCheckAlt() node %s status = %v, want %v", node, state, wantStatus) } } } From 3cf88f757cf338a67c70aa2e2e4383e4f6b61011 Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Tue, 3 Feb 2026 16:25:48 +0100 Subject: [PATCH 11/17] Update to checkpoint loader in CCMS --- pkg/metricstore/archive.go | 3 +- pkg/metricstore/checkpoint.go | 83 +++++++++-------------------------- 2 files changed, 21 insertions(+), 65 deletions(-) diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index cab4c24f..784348b5 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -158,8 +158,7 @@ func cleanupCheckpoints(dir string, cleanupDir string, from int64, deleteInstead return 0, err } - extension := Keys.Checkpoints.FileFormat - files, err := findFiles(entries, from, extension, false) + files, err := findFiles(entries, from, false) if err != nil { return 0, err } diff --git a/pkg/metricstore/checkpoint.go b/pkg/metricstore/checkpoint.go index 715566e4..b4097ff2 100644 --- a/pkg/metricstore/checkpoint.go +++ b/pkg/metricstore/checkpoint.go @@ -415,7 +415,7 @@ func enqueueCheckpointHosts(dir string, work chan<- [2]string) error { // // Uses worker pool to load cluster/host combinations. Periodically triggers GC // to prevent excessive heap growth. Returns number of files loaded and any errors. -func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) (int, error) { +func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { var wg sync.WaitGroup work := make(chan [2]string, Keys.NumWorkers*4) n, errs := int32(0), int32(0) @@ -426,7 +426,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) ( defer wg.Done() for host := range work { lvl := m.root.findLevelOrCreate(host[:], len(m.Metrics)) - nn, err := lvl.fromCheckpoint(m, filepath.Join(dir, host[0], host[1]), from, extension) + nn, err := lvl.fromCheckpoint(m, filepath.Join(dir, host[0], host[1]), from) if err != nil { cclog.Errorf("[METRICSTORE]> error while loading checkpoints for %s/%s: %s", host[0], host[1], err.Error()) atomic.AddInt32(&errs, 1) @@ -465,57 +465,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { cclog.Debugf("[METRICSTORE]> %#v Directory created successfully", dir) } - // Config read (replace with your actual config read) - fileFormat := Keys.Checkpoints.FileFormat - if fileFormat == "" { - fileFormat = "avro" - } - - // Map to easily get the fallback format - oppositeFormat := map[string]string{ - "json": "avro", - "avro": "json", - } - - // First, attempt to load the specified format - if found, err := checkFilesWithExtension(dir, fileFormat); err != nil { - return 0, fmt.Errorf("[METRICSTORE]> error checking files with extension: %v", err) - } else if found { - cclog.Infof("[METRICSTORE]> Loading %s files because fileformat is %s", fileFormat, fileFormat) - return m.FromCheckpoint(dir, from, fileFormat) - } - - // If not found, attempt the opposite format - altFormat := oppositeFormat[fileFormat] - if found, err := checkFilesWithExtension(dir, altFormat); err != nil { - return 0, fmt.Errorf("[METRICSTORE]> error checking files with extension: %v", err) - } else if found { - cclog.Infof("[METRICSTORE]> Loading %s files but fileformat is %s", altFormat, fileFormat) - return m.FromCheckpoint(dir, from, altFormat) - } - - return 0, nil -} - -// checkFilesWithExtension walks a directory tree to check if files with the given extension exist. -func checkFilesWithExtension(dir string, extension string) (bool, error) { - found := false - - err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return fmt.Errorf("[METRICSTORE]> error accessing path %s: %v", path, err) - } - if !info.IsDir() && filepath.Ext(info.Name()) == "."+extension { - found = true - return nil - } - return nil - }) - if err != nil { - return false, fmt.Errorf("[METRICSTORE]> error walking through directories: %s", err) - } - - return found, nil + return m.FromCheckpoint(dir, from) } func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { @@ -729,7 +679,7 @@ func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error { return nil } -func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64, extension string) (int, error) { +func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64) (int, error) { direntries, err := os.ReadDir(dir) if err != nil { if os.IsNotExist(err) { @@ -748,33 +698,38 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64, extension children: make(map[string]*Level), } - files, err := child.fromCheckpoint(m, path.Join(dir, e.Name()), from, extension) + files, err := child.fromCheckpoint(m, path.Join(dir, e.Name()), from) filesLoaded += files if err != nil { return filesLoaded, err } l.children[e.Name()] = child - } else if strings.HasSuffix(e.Name(), "."+extension) { + } else if strings.HasSuffix(e.Name(), ".json") || strings.HasSuffix(e.Name(), ".avro") { allFiles = append(allFiles, e) } else { continue } } - files, err := findFiles(allFiles, from, extension, true) + files, err := findFiles(allFiles, from, true) if err != nil { return filesLoaded, err } loaders := map[string]func(*MemoryStore, *os.File, int64) error{ - "json": l.loadJSONFile, - "avro": l.loadAvroFile, + ".json": l.loadJSONFile, + ".avro": l.loadAvroFile, } - loader := loaders[extension] - for _, filename := range files { + ext := filepath.Ext(filename) + loader := loaders[ext] + if loader == nil { + cclog.Warnf("Unknown extension for file %s", filename) + continue + } + // Use a closure to ensure file is closed immediately after use err := func() error { f, err := os.Open(path.Join(dir, filename)) @@ -798,10 +753,12 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64, extension // This will probably get very slow over time! // A solution could be some sort of an index file in which all other files // and the timespan they contain is listed. -func findFiles(direntries []fs.DirEntry, t int64, extension string, findMoreRecentFiles bool) ([]string, error) { +// NOTE: This now assumes that you have distinct timestamps for json and avro files +// Also, it assumes that the timestamps are not overlapping/self-modified. +func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]string, error) { nums := map[string]int64{} for _, e := range direntries { - if !strings.HasSuffix(e.Name(), "."+extension) { + if !strings.HasSuffix(e.Name(), ".json") && !strings.HasSuffix(e.Name(), ".avro") { continue } From 0d62a300e7dd9b3bd4c15a59250c6f1b073a293f Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Tue, 3 Feb 2026 18:35:17 +0100 Subject: [PATCH 12/17] Intermediate state of node Healthcheck TODOS: * Remove error handling from routine and simplify API call * Use map for hardware level metrics --- internal/api/node.go | 2 +- pkg/metricstore/healthcheck.go | 13 +++++-------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/internal/api/node.go b/internal/api/node.go index 853b23e1..ce8a263a 100644 --- a/internal/api/node.go +++ b/internal/api/node.go @@ -99,7 +99,7 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { for _, node := range req.Nodes { state := determineState(node.States) - healthState := schema.MonitoringStateFull + healthState := schema.MonitoringStateFailed if hs, ok := healthStates[node.Hostname]; ok { healthState = hs } diff --git a/pkg/metricstore/healthcheck.go b/pkg/metricstore/healthcheck.go index 5ab26466..da5ccc7d 100644 --- a/pkg/metricstore/healthcheck.go +++ b/pkg/metricstore/healthcheck.go @@ -283,23 +283,20 @@ func (l *Level) getHealthyMetrics(m *MemoryStore) ([]string, []string, error) { for metricName, mc := range m.Metrics { b := l.metrics[mc.offset] if b.isBufferHealthy() { - // Buffer has recent data, now check for missing values - missingCount := b.countMissingValues() - if missingCount > int(MaxMissingDataPoints) { - degradedList = append(degradedList, metricName) - } else { - healthyList = append(healthyList, metricName) - } + healthyList = append(healthyList, metricName) + } else { + degradedList = append(degradedList, metricName) } } - // Phase 2: Recursively check child levels (hardware components) + // Phase 2: Recursively check child levels for _, lvl := range l.children { childHealthy, childDegraded, err := lvl.getHealthyMetrics(m) if err != nil { return nil, nil, err } + // FIXME: Use a map to collect core level metrics // Merge child metrics into flat lists healthyList = append(healthyList, childHealthy...) degradedList = append(degradedList, childDegraded...) From 39b8356683eb4f3e470f2f8477035d94c54e06f2 Mon Sep 17 00:00:00 2001 From: Aditya Ujeniya Date: Wed, 4 Feb 2026 10:24:45 +0100 Subject: [PATCH 13/17] Optimized CCMS healthcheck --- internal/api/metricstore.go | 42 ---- internal/api/node.go | 2 +- internal/api/rest.go | 6 +- pkg/metricstore/healthcheck.go | 290 +++++----------------------- pkg/metricstore/metricstore_test.go | 2 +- 5 files changed, 52 insertions(+), 290 deletions(-) diff --git a/internal/api/metricstore.go b/internal/api/metricstore.go index d99222d2..5c15bb2c 100644 --- a/internal/api/metricstore.go +++ b/internal/api/metricstore.go @@ -135,45 +135,3 @@ func debugMetrics(rw http.ResponseWriter, r *http.Request) { return } } - -// handleHealthCheck godoc -// @summary HealthCheck endpoint -// @tags healthcheck -// @description This endpoint allows the users to check if a node is healthy -// @produce json -// @param selector query string false "Selector" -// @success 200 {string} string "Debug dump" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 403 {object} api.ErrorResponse "Forbidden" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /healthcheck/ [get] -func metricsHealth(rw http.ResponseWriter, r *http.Request) { - rawCluster := r.URL.Query().Get("cluster") - rawSubCluster := r.URL.Query().Get("subcluster") - rawNode := r.URL.Query().Get("node") - - if rawCluster == "" || rawNode == "" { - handleError(errors.New("'cluster' and 'node' are required query parameter"), http.StatusBadRequest, rw) - return - } - - rw.Header().Add("Content-Type", "application/json") - - selector := []string{rawCluster, rawNode} - - ms := metricstore.GetMemoryStore() - response, err := ms.HealthCheck(selector, rawSubCluster) - if err != nil { - handleError(err, http.StatusBadRequest, rw) - return - } - - jsonData, err := json.Marshal(response) - if err != nil { - cclog.Errorf("Error marshaling HealthCheckResponse JSON: %s", err) - } - - rw.Write(jsonData) -} diff --git a/internal/api/node.go b/internal/api/node.go index ce8a263a..37a8576c 100644 --- a/internal/api/node.go +++ b/internal/api/node.go @@ -91,7 +91,7 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { if sc != "" { metricList := archive.GetMetricConfigSubCluster(req.Cluster, sc) metricNames := metricListToNames(metricList) - if states, err := ms.HealthCheckAlt(req.Cluster, nl, metricNames); err == nil { + if states, err := ms.HealthCheck(req.Cluster, nl, metricNames); err == nil { maps.Copy(healthStates, states) } } diff --git a/internal/api/rest.go b/internal/api/rest.go index 0d52742e..3f6d9609 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -81,7 +81,7 @@ func (api *RestAPI) MountAPIRoutes(r *mux.Router) { // Cluster List r.HandleFunc("/clusters/", api.getClusters).Methods(http.MethodGet) // Slurm node state - r.HandleFunc("/nodestate/", api.updateNodeStates).Methods(http.MethodPost, http.MethodPut) + r.HandleFunc("/nodestates/", api.updateNodeStates).Methods(http.MethodPost, http.MethodPut) // Job Handler if config.Keys.APISubjects == nil { cclog.Info("Enabling REST start/stop job API") @@ -127,12 +127,12 @@ func (api *RestAPI) MountMetricStoreAPIRoutes(r *mux.Router) { r.HandleFunc("/free", freeMetrics).Methods(http.MethodPost) r.HandleFunc("/write", writeMetrics).Methods(http.MethodPost) r.HandleFunc("/debug", debugMetrics).Methods(http.MethodGet) - r.HandleFunc("/healthcheck", metricsHealth).Methods(http.MethodGet) + r.HandleFunc("/healthcheck", api.updateNodeStates).Methods(http.MethodPost) // Same endpoints but with trailing slash r.HandleFunc("/free/", freeMetrics).Methods(http.MethodPost) r.HandleFunc("/write/", writeMetrics).Methods(http.MethodPost) r.HandleFunc("/debug/", debugMetrics).Methods(http.MethodGet) - r.HandleFunc("/healthcheck/", metricsHealth).Methods(http.MethodGet) + r.HandleFunc("/healthcheck/", api.updateNodeStates).Methods(http.MethodPost) } // MountConfigAPIRoutes registers configuration and user management endpoints. diff --git a/pkg/metricstore/healthcheck.go b/pkg/metricstore/healthcheck.go index da5ccc7d..8e8e7952 100644 --- a/pkg/metricstore/healthcheck.go +++ b/pkg/metricstore/healthcheck.go @@ -6,7 +6,9 @@ package metricstore import ( + "cmp" "fmt" + "slices" "time" "github.com/ClusterCockpit/cc-lib/v2/schema" @@ -15,14 +17,6 @@ import ( type HeathCheckResponse struct { Status schema.MonitoringState Error error - list List -} - -type List struct { - StaleNodeMetricList []string - StaleHardwareMetricList map[string][]string - MissingNodeMetricList []string - MissingHardwareMetricList map[string][]string } // MaxMissingDataPoints is a threshold that allows a node to be healthy with certain number of data points missing. @@ -30,178 +24,17 @@ type List struct { // node is healthy. Anything more than 5 missing points in metrics of the node will deem the node unhealthy. const MaxMissingDataPoints int64 = 5 -func (b *buffer) healthCheck() bool { +// isBufferHealthy checks if a buffer has received data for the last MaxMissingDataPoints. +// +// Returns true if the buffer is healthy (recent data within threshold), false otherwise. +// A nil buffer or empty buffer is considered unhealthy. +func (b *buffer) bufferExists() bool { // Check if the buffer is empty - if b.data == nil { - return true + if b == nil || b.data == nil || len(b.data) == 0 { + return false } - bufferEnd := b.start + b.frequency*int64(len(b.data)) - t := time.Now().Unix() - - // Check if the buffer is too old - if t-bufferEnd > MaxMissingDataPoints*b.frequency { - return true - } - - return false -} - -// healthCheck recursively examines a level and all its children to identify stale or missing metrics. -// -// This routine performs a two-phase check: -// -// Phase 1 - Check metrics at current level (node-level metrics): -// - Iterates through all configured metrics in m.Metrics -// - For each metric, checks if a buffer exists at l.metrics[mc.offset] -// - If buffer exists: calls buffer.healthCheck() to verify data freshness -// - Stale buffer (data older than MaxMissingDataPoints * frequency) → StaleNodeMetricList -// - Fresh buffer → healthy, no action -// - If buffer is nil: metric was never written → MissingNodeMetricList -// -// Phase 2 - Recursively check child levels (hardware-level metrics): -// - Iterates through l.children (e.g., "cpu0", "gpu0", "socket0") -// - Recursively calls healthCheck() on each child level -// - Aggregates child results into hardware-specific lists: -// - Child's StaleNodeMetricList → parent's StaleHardwareMetricList[childName] -// - Child's MissingNodeMetricList → parent's MissingHardwareMetricList[childName] -// -// The recursive nature means: -// - Calling on a host level checks: host metrics + all CPU/GPU/socket metrics -// - Calling on a socket level checks: socket metrics + all core metrics -// - Leaf levels (e.g., individual cores) only check their own metrics -// -// Parameters: -// - m: MemoryStore containing the global metric configuration (m.Metrics) -// -// Returns: -// - List: Categorized lists of stale and missing metrics at this level and below -// - error: Non-nil only for internal errors during recursion -// -// Concurrency: -// - Acquires read lock (RLock) to safely access l.metrics and l.children -// - Lock held for entire duration including recursive calls -// -// Example for host level with structure: host → [cpu0, cpu1]: -// - Checks host-level metrics (load, memory) → StaleNodeMetricList / MissingNodeMetricList -// - Recursively checks cpu0 metrics → results in StaleHardwareMetricList["cpu0"] -// - Recursively checks cpu1 metrics → results in StaleHardwareMetricList["cpu1"] -func (l *Level) healthCheck(m *MemoryStore) (List, error) { - l.lock.RLock() - defer l.lock.RUnlock() - - list := List{ - StaleNodeMetricList: make([]string, 0), - StaleHardwareMetricList: make(map[string][]string, 0), - MissingNodeMetricList: make([]string, 0), - MissingHardwareMetricList: make(map[string][]string, 0), - } - - // Phase 1: Check metrics at this level - for metricName, mc := range m.Metrics { - if b := l.metrics[mc.offset]; b != nil { - if b.healthCheck() { - list.StaleNodeMetricList = append(list.StaleNodeMetricList, metricName) - } - } else { - list.MissingNodeMetricList = append(list.MissingNodeMetricList, metricName) - } - } - - // Phase 2: Recursively check child levels (hardware components) - for hardwareMetricName, lvl := range l.children { - l, err := lvl.healthCheck(m) - if err != nil { - return List{}, err - } - - if len(l.StaleNodeMetricList) != 0 { - list.StaleHardwareMetricList[hardwareMetricName] = l.StaleNodeMetricList - } - if len(l.MissingNodeMetricList) != 0 { - list.MissingHardwareMetricList[hardwareMetricName] = l.MissingNodeMetricList - } - } - - return list, nil -} - -// HealthCheck performs a health check on a specific node in the metric store. -// -// This routine checks whether metrics for a given node are being received and are up-to-date. -// It examines both node-level metrics (e.g., load, memory) and hardware-level metrics -// (e.g., CPU, GPU, network) to determine the monitoring state. -// -// Parameters: -// - selector: Hierarchical path to the target node, typically []string{cluster, hostname}. -// Example: []string{"emmy", "node001"} navigates to the "node001" host in the "emmy" cluster. -// The selector must match the hierarchy used during metric ingestion (see Level.findLevelOrCreate). -// - subcluster: Subcluster name (currently unused, reserved for future filtering) -// -// Returns: -// - *HeathCheckResponse: Health status with detailed lists of stale/missing metrics -// - error: Non-nil only for internal errors (not for unhealthy nodes) -// -// Health States: -// - MonitoringStateFull: All expected metrics are present and up-to-date -// - MonitoringStatePartial: Some metrics are stale (data older than MaxMissingDataPoints * frequency) -// - MonitoringStateFailed: Host not found, or metrics are completely missing -// -// The response includes detailed lists: -// - StaleNodeMetricList: Node-level metrics with stale data -// - StaleHardwareMetricList: Hardware-level metrics with stale data (grouped by component) -// - MissingNodeMetricList: Expected node-level metrics that have no data -// - MissingHardwareMetricList: Expected hardware-level metrics that have no data (grouped by component) -// -// Example usage: -// -// selector := []string{"emmy", "node001"} -// response, err := ms.HealthCheck(selector, "") -// if err != nil { -// // Internal error -// } -// switch response.Status { -// case schema.MonitoringStateFull: -// // All metrics healthy -// case schema.MonitoringStatePartial: -// // Check response.list.StaleNodeMetricList for details -// case schema.MonitoringStateFailed: -// // Check response.Error or response.list.MissingNodeMetricList -// } -func (m *MemoryStore) HealthCheck(selector []string, subcluster string) (*HeathCheckResponse, error) { - response := HeathCheckResponse{ - Status: schema.MonitoringStateFull, - } - - lvl := m.root.findLevel(selector) - if lvl == nil { - response.Status = schema.MonitoringStateFailed - response.Error = fmt.Errorf("[METRICSTORE]> error while HealthCheck, host not found: %#v", selector) - return &response, nil - } - - var err error - - response.list, err = lvl.healthCheck(m) - if err != nil { - return nil, err - } - - fmt.Printf("Response: %#v\n", response) - - if len(response.list.StaleNodeMetricList) != 0 || - len(response.list.StaleHardwareMetricList) != 0 { - response.Status = schema.MonitoringStatePartial - return &response, nil - } - - if len(response.list.MissingHardwareMetricList) != 0 || - len(response.list.MissingNodeMetricList) != 0 { - response.Status = schema.MonitoringStateFailed - return &response, nil - } - - return &response, nil + return true } // isBufferHealthy checks if a buffer has received data for the last MaxMissingDataPoints. @@ -209,11 +42,7 @@ func (m *MemoryStore) HealthCheck(selector []string, subcluster string) (*HeathC // Returns true if the buffer is healthy (recent data within threshold), false otherwise. // A nil buffer or empty buffer is considered unhealthy. func (b *buffer) isBufferHealthy() bool { - // Check if the buffer is empty - if b == nil || b.data == nil { - return false - } - + // Get the last endtime of the buffer bufferEnd := b.start + b.frequency*int64(len(b.data)) t := time.Now().Unix() @@ -225,32 +54,20 @@ func (b *buffer) isBufferHealthy() bool { return true } -// countMissingValues counts the number of NaN (missing) values in the most recent data points. -// -// Examines the last MaxMissingDataPoints*2 values in the buffer and counts how many are NaN. -// We check twice the threshold to allow detecting when more than MaxMissingDataPoints are missing. -// If the buffer has fewer values, examines all available values. -// -// Returns: -// - int: Number of NaN values found in the examined range -func (b *buffer) countMissingValues() int { - if b == nil || b.data == nil || len(b.data) == 0 { - return 0 - } +// MergeUniqueSorted merges two lists, sorts them, and removes duplicates. +// Requires 'cmp.Ordered' because we need to sort the data. +func mergeList[string cmp.Ordered](list1, list2 []string) []string { + // 1. Combine both lists + result := append(list1, list2...) - // Check twice the threshold to detect degraded metrics - checkCount := min(int(MaxMissingDataPoints)*2, len(b.data)) + // 2. Sort the combined list + slices.Sort(result) - // Count NaN values in the most recent data points - missingCount := 0 - startIdx := len(b.data) - checkCount - for i := startIdx; i < len(b.data); i++ { - if b.data[i].IsNaN() { - missingCount++ - } - } + // 3. Compact removes consecutive duplicates (standard in Go 1.21+) + // e.g. [1, 1, 2, 3, 3] -> [1, 2, 3] + result = slices.Compact(result) - return missingCount + return result } // getHealthyMetrics recursively collects healthy and degraded metrics at this level and below. @@ -272,37 +89,39 @@ func (b *buffer) countMissingValues() int { // The routine mirrors healthCheck() but provides more granular classification: // - healthCheck() finds problems (stale/missing) // - getHealthyMetrics() separates healthy from degraded metrics -func (l *Level) getHealthyMetrics(m *MemoryStore) ([]string, []string, error) { +func (l *Level) getHealthyMetrics(m *MemoryStore, expectedMetrics []string) ([]string, []string, error) { l.lock.RLock() defer l.lock.RUnlock() - healthyList := make([]string, 0) + globalMetrics := m.Metrics + + missingList := make([]string, 0) degradedList := make([]string, 0) // Phase 1: Check metrics at this level - for metricName, mc := range m.Metrics { - b := l.metrics[mc.offset] - if b.isBufferHealthy() { - healthyList = append(healthyList, metricName) - } else { + for _, metricName := range expectedMetrics { + offset := globalMetrics[metricName].offset + b := l.metrics[offset] + + if !b.bufferExists() { + missingList = append(missingList, metricName) + } else if !b.isBufferHealthy() { degradedList = append(degradedList, metricName) } } // Phase 2: Recursively check child levels for _, lvl := range l.children { - childHealthy, childDegraded, err := lvl.getHealthyMetrics(m) + childMissing, childDegraded, err := lvl.getHealthyMetrics(m, expectedMetrics) if err != nil { return nil, nil, err } - // FIXME: Use a map to collect core level metrics - // Merge child metrics into flat lists - healthyList = append(healthyList, childHealthy...) - degradedList = append(degradedList, childDegraded...) + missingList = mergeList(missingList, childMissing) + degradedList = mergeList(degradedList, childDegraded) } - return healthyList, degradedList, nil + return missingList, degradedList, nil } // GetHealthyMetrics returns healthy and degraded metrics for a specific node as flat lists. @@ -343,18 +162,18 @@ func (l *Level) getHealthyMetrics(m *MemoryStore) ([]string, []string, error) { // Note: This routine provides more granular classification than HealthCheck: // - HealthCheck reports stale/missing metrics (problems) // - GetHealthyMetrics separates fully healthy from degraded metrics (quality levels) -func (m *MemoryStore) GetHealthyMetrics(selector []string) ([]string, []string, error) { +func (m *MemoryStore) GetHealthyMetrics(selector []string, expectedMetrics []string) ([]string, []string, error) { lvl := m.root.findLevel(selector) if lvl == nil { return nil, nil, fmt.Errorf("[METRICSTORE]> error while GetHealthyMetrics, host not found: %#v", selector) } - healthyList, degradedList, err := lvl.getHealthyMetrics(m) + missingList, degradedList, err := lvl.getHealthyMetrics(m, expectedMetrics) if err != nil { return nil, nil, err } - return healthyList, degradedList, nil + return missingList, degradedList, nil } // HealthCheckAlt performs health checks on multiple nodes and returns their monitoring states. @@ -393,7 +212,7 @@ func (m *MemoryStore) GetHealthyMetrics(selector []string) ([]string, []string, // Note: This routine is optimized for batch operations where you need to check // the same set of metrics across multiple nodes. For single-node checks with // all configured metrics, use HealthCheck() instead. -func (m *MemoryStore) HealthCheckAlt(cluster string, +func (m *MemoryStore) HealthCheck(cluster string, nodes []string, expectedMetrics []string, ) (map[string]schema.MonitoringState, error) { results := make(map[string]schema.MonitoringState, len(nodes)) @@ -413,33 +232,16 @@ func (m *MemoryStore) HealthCheckAlt(cluster string, missingCount := 0 // Get healthy and degraded metrics for this node - healthyList, degradedList, err := m.GetHealthyMetrics(selector) + missingList, degradedList, err := m.GetHealthyMetrics(selector, expectedMetrics) if err != nil { // Node not found or internal error results[hostname] = schema.MonitoringStateFailed continue } - // Create sets for fast lookup - healthySet := make(map[string]bool, len(healthyList)) - for _, metric := range healthyList { - healthySet[metric] = true - } - degradedSet := make(map[string]bool, len(degradedList)) - for _, metric := range degradedList { - degradedSet[metric] = true - } - - // Classify each expected metric - for _, metric := range expectedMetrics { - if healthySet[metric] { - healthyCount++ - } else if degradedSet[metric] { - degradedCount++ - } else { - missingCount++ - } - } + missingCount = len(missingList) + degradedCount = len(degradedList) + healthyCount = len(expectedMetrics) - (missingCount + degradedCount) // Determine overall health status if missingCount > 0 || degradedCount > 0 { @@ -456,5 +258,7 @@ func (m *MemoryStore) HealthCheckAlt(cluster string, results[hostname] = status } + fmt.Printf("Results : %#v\n\n", results) + return results, nil } diff --git a/pkg/metricstore/metricstore_test.go b/pkg/metricstore/metricstore_test.go index e0fcfea5..035aa1be 100644 --- a/pkg/metricstore/metricstore_test.go +++ b/pkg/metricstore/metricstore_test.go @@ -219,7 +219,7 @@ func TestHealthCheckAlt(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - results, err := ms.HealthCheckAlt(tt.cluster, tt.nodes, tt.expectedMetrics) + results, err := ms.HealthCheck(tt.cluster, tt.nodes, tt.expectedMetrics) if err != nil { t.Errorf("HealthCheckAlt() error = %v", err) return From 46fb52d67e51eb4f9ce0935e0fac106cc6fb4ebd Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 4 Feb 2026 12:30:33 +0100 Subject: [PATCH 14/17] Adopt documentation --- pkg/metricstore/healthcheck.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pkg/metricstore/healthcheck.go b/pkg/metricstore/healthcheck.go index 8e8e7952..f390749d 100644 --- a/pkg/metricstore/healthcheck.go +++ b/pkg/metricstore/healthcheck.go @@ -14,7 +14,11 @@ import ( "github.com/ClusterCockpit/cc-lib/v2/schema" ) -type HeathCheckResponse struct { +// HealthCheckResponse represents the result of a health check operation. +// +// Status indicates the monitoring state (Full, Partial, Failed). +// Error contains any error encountered during the health check. +type HealthCheckResponse struct { Status schema.MonitoringState Error error } @@ -176,7 +180,7 @@ func (m *MemoryStore) GetHealthyMetrics(selector []string, expectedMetrics []str return missingList, degradedList, nil } -// HealthCheckAlt performs health checks on multiple nodes and returns their monitoring states. +// HealthCheck performs health checks on multiple nodes and returns their monitoring states. // // This routine provides a batch health check interface that evaluates multiple nodes // against a specific set of expected metrics. For each node, it determines the overall @@ -201,7 +205,7 @@ func (m *MemoryStore) GetHealthyMetrics(selector []string, expectedMetrics []str // cluster := "emmy" // nodes := []string{"node001", "node002", "node003"} // expectedMetrics := []string{"load", "mem_used", "cpu_user", "cpu_system"} -// healthStates, err := ms.HealthCheckAlt(cluster, nodes, expectedMetrics) +// healthStates, err := ms.HealthCheck(cluster, nodes, expectedMetrics) // if err != nil { // return err // } @@ -210,8 +214,7 @@ func (m *MemoryStore) GetHealthyMetrics(selector []string, expectedMetrics []str // } // // Note: This routine is optimized for batch operations where you need to check -// the same set of metrics across multiple nodes. For single-node checks with -// all configured metrics, use HealthCheck() instead. +// the same set of metrics across multiple nodes. func (m *MemoryStore) HealthCheck(cluster string, nodes []string, expectedMetrics []string, ) (map[string]schema.MonitoringState, error) { @@ -258,7 +261,5 @@ func (m *MemoryStore) HealthCheck(cluster string, results[hostname] = status } - fmt.Printf("Results : %#v\n\n", results) - return results, nil } From 5d7dd62b72f677a4291d264b5520ba0842b92a57 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 4 Feb 2026 12:53:24 +0100 Subject: [PATCH 15/17] Update unit test for new HealthCheck update --- pkg/metricstore/metricstore_test.go | 270 +++++++++++++++++++++++++--- 1 file changed, 243 insertions(+), 27 deletions(-) diff --git a/pkg/metricstore/metricstore_test.go b/pkg/metricstore/metricstore_test.go index 035aa1be..f96f49a2 100644 --- a/pkg/metricstore/metricstore_test.go +++ b/pkg/metricstore/metricstore_test.go @@ -90,7 +90,7 @@ func TestBufferRead(t *testing.T) { } } -func TestHealthCheckAlt(t *testing.T) { +func TestHealthCheck(t *testing.T) { // Create a test MemoryStore with some metrics metrics := map[string]MetricConfig{ "load": {Frequency: 10, Aggregation: AvgAggregation, offset: 0}, @@ -111,38 +111,31 @@ func TestHealthCheckAlt(t *testing.T) { now := time.Now().Unix() startTime := now - 100 // Start 100 seconds ago to have enough data points - // Setup test data for node001 - all metrics healthy + // Setup test data for node001 - all metrics healthy (recent data) node001 := ms.root.findLevelOrCreate([]string{"testcluster", "node001"}, len(metrics)) for i := 0; i < len(metrics); i++ { node001.metrics[i] = newBuffer(startTime, 10) - // Write recent data with no NaN values + // Write recent data up to now for ts := startTime; ts <= now; ts += 10 { node001.metrics[i].write(ts, schema.Float(float64(i+1))) } } - // Setup test data for node002 - some metrics degraded (many NaN values) + // Setup test data for node002 - some metrics stale (old data beyond MaxMissingDataPoints threshold) node002 := ms.root.findLevelOrCreate([]string{"testcluster", "node002"}, len(metrics)) + // MaxMissingDataPoints = 5, frequency = 10, so threshold is 50 seconds + staleTime := now - 100 // Data ends 100 seconds ago (well beyond 50 second threshold) for i := 0; i < len(metrics); i++ { - node002.metrics[i] = newBuffer(startTime, 10) + node002.metrics[i] = newBuffer(staleTime-50, 10) if i < 2 { - // First two metrics: healthy (no NaN) + // First two metrics: healthy (recent data) for ts := startTime; ts <= now; ts += 10 { node002.metrics[i].write(ts, schema.Float(float64(i+1))) } } else { - // Last two metrics: degraded (many NaN values in recent data) - // Write real values first, then NaN values at the end - count := 0 - for ts := startTime; ts <= now; ts += 10 { - if count < 5 { - // Write first 5 real values - node002.metrics[i].write(ts, schema.Float(float64(i+1))) - } else { - // Write NaN for the rest (last ~6 values will be NaN) - node002.metrics[i].write(ts, schema.NaN) - } - count++ + // Last two metrics: stale (data ends 100 seconds ago) + for ts := staleTime - 50; ts <= staleTime; ts += 10 { + node002.metrics[i].write(ts, schema.Float(float64(i+1))) } } } @@ -158,6 +151,16 @@ func TestHealthCheckAlt(t *testing.T) { } // Leave metrics[2] and metrics[3] as nil (missing) + // Setup test data for node005 - all metrics stale + node005 := ms.root.findLevelOrCreate([]string{"testcluster", "node005"}, len(metrics)) + for i := 0; i < len(metrics); i++ { + node005.metrics[i] = newBuffer(staleTime-50, 10) + // All metrics have stale data (ends 100 seconds ago) + for ts := staleTime - 50; ts <= staleTime; ts += 10 { + node005.metrics[i].write(ts, schema.Float(float64(i+1))) + } + } + // node004 doesn't exist at all tests := []struct { @@ -177,7 +180,7 @@ func TestHealthCheckAlt(t *testing.T) { }, }, { - name: "some metrics degraded", + name: "some metrics stale", cluster: "testcluster", nodes: []string{"node002"}, expectedMetrics: []string{"load", "mem_used", "cpu_user", "cpu_system"}, @@ -203,16 +206,26 @@ func TestHealthCheckAlt(t *testing.T) { "node004": schema.MonitoringStateFailed, }, }, + { + name: "all metrics stale", + cluster: "testcluster", + nodes: []string{"node005"}, + expectedMetrics: []string{"load", "mem_used", "cpu_user", "cpu_system"}, + wantStates: map[string]schema.MonitoringState{ + "node005": schema.MonitoringStateFailed, + }, + }, { name: "multiple nodes mixed states", cluster: "testcluster", - nodes: []string{"node001", "node002", "node003", "node004"}, + nodes: []string{"node001", "node002", "node003", "node004", "node005"}, expectedMetrics: []string{"load", "mem_used"}, wantStates: map[string]schema.MonitoringState{ "node001": schema.MonitoringStateFull, - "node002": schema.MonitoringStateFull, - "node003": schema.MonitoringStateFull, - "node004": schema.MonitoringStateFailed, + "node002": schema.MonitoringStateFull, // Only checking first 2 metrics which are healthy + "node003": schema.MonitoringStateFull, // Only checking first 2 metrics which exist + "node004": schema.MonitoringStateFailed, // Node doesn't exist + "node005": schema.MonitoringStateFailed, // Both metrics are stale }, }, } @@ -221,30 +234,233 @@ func TestHealthCheckAlt(t *testing.T) { t.Run(tt.name, func(t *testing.T) { results, err := ms.HealthCheck(tt.cluster, tt.nodes, tt.expectedMetrics) if err != nil { - t.Errorf("HealthCheckAlt() error = %v", err) + t.Errorf("HealthCheck() error = %v", err) return } // Check that we got results for all nodes if len(results) != len(tt.nodes) { - t.Errorf("HealthCheckAlt() returned %d results, want %d", len(results), len(tt.nodes)) + t.Errorf("HealthCheck() returned %d results, want %d", len(results), len(tt.nodes)) } // Check each node's state for _, node := range tt.nodes { state, ok := results[node] if !ok { - t.Errorf("HealthCheckAlt() missing result for node %s", node) + t.Errorf("HealthCheck() missing result for node %s", node) continue } // Check status if wantStatus, ok := tt.wantStates[node]; ok { if state != wantStatus { - t.Errorf("HealthCheckAlt() node %s status = %v, want %v", node, state, wantStatus) + t.Errorf("HealthCheck() node %s status = %v, want %v", node, state, wantStatus) } } } }) } } + +// TestGetHealthyMetrics tests the GetHealthyMetrics function which returns lists of missing and degraded metrics +func TestGetHealthyMetrics(t *testing.T) { + metrics := map[string]MetricConfig{ + "load": {Frequency: 10, Aggregation: AvgAggregation, offset: 0}, + "mem_used": {Frequency: 10, Aggregation: AvgAggregation, offset: 1}, + "cpu_user": {Frequency: 10, Aggregation: AvgAggregation, offset: 2}, + } + + ms := &MemoryStore{ + Metrics: metrics, + root: Level{ + metrics: make([]*buffer, len(metrics)), + children: make(map[string]*Level), + }, + } + + now := time.Now().Unix() + startTime := now - 100 + staleTime := now - 100 + + // Setup node with mixed health states + node := ms.root.findLevelOrCreate([]string{"testcluster", "testnode"}, len(metrics)) + + // Metric 0 (load): healthy - recent data + node.metrics[0] = newBuffer(startTime, 10) + for ts := startTime; ts <= now; ts += 10 { + node.metrics[0].write(ts, schema.Float(1.0)) + } + + // Metric 1 (mem_used): degraded - stale data + node.metrics[1] = newBuffer(staleTime-50, 10) + for ts := staleTime - 50; ts <= staleTime; ts += 10 { + node.metrics[1].write(ts, schema.Float(2.0)) + } + + // Metric 2 (cpu_user): missing - no buffer (nil) + + tests := []struct { + name string + selector []string + expectedMetrics []string + wantMissing []string + wantDegraded []string + wantErr bool + }{ + { + name: "mixed health states", + selector: []string{"testcluster", "testnode"}, + expectedMetrics: []string{"load", "mem_used", "cpu_user"}, + wantMissing: []string{"cpu_user"}, + wantDegraded: []string{"mem_used"}, + wantErr: false, + }, + { + name: "node not found", + selector: []string{"testcluster", "nonexistent"}, + expectedMetrics: []string{"load"}, + wantMissing: nil, + wantDegraded: nil, + wantErr: true, + }, + { + name: "check only healthy metric", + selector: []string{"testcluster", "testnode"}, + expectedMetrics: []string{"load"}, + wantMissing: []string{}, + wantDegraded: []string{}, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + missing, degraded, err := ms.GetHealthyMetrics(tt.selector, tt.expectedMetrics) + + if (err != nil) != tt.wantErr { + t.Errorf("GetHealthyMetrics() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if tt.wantErr { + return + } + + // Check missing list + if len(missing) != len(tt.wantMissing) { + t.Errorf("GetHealthyMetrics() missing = %v, want %v", missing, tt.wantMissing) + } else { + for i, m := range tt.wantMissing { + if missing[i] != m { + t.Errorf("GetHealthyMetrics() missing[%d] = %v, want %v", i, missing[i], m) + } + } + } + + // Check degraded list + if len(degraded) != len(tt.wantDegraded) { + t.Errorf("GetHealthyMetrics() degraded = %v, want %v", degraded, tt.wantDegraded) + } else { + for i, d := range tt.wantDegraded { + if degraded[i] != d { + t.Errorf("GetHealthyMetrics() degraded[%d] = %v, want %v", i, degraded[i], d) + } + } + } + }) + } +} + +// TestBufferHealthChecks tests the buffer-level health check functions +func TestBufferHealthChecks(t *testing.T) { + now := time.Now().Unix() + + tests := []struct { + name string + setupBuffer func() *buffer + wantExists bool + wantHealthy bool + description string + }{ + { + name: "nil buffer", + setupBuffer: func() *buffer { + return nil + }, + wantExists: false, + wantHealthy: false, + description: "nil buffer should not exist and not be healthy", + }, + { + name: "empty buffer", + setupBuffer: func() *buffer { + b := newBuffer(now, 10) + b.data = nil + return b + }, + wantExists: false, + wantHealthy: false, + description: "empty buffer should not exist and not be healthy", + }, + { + name: "healthy buffer with recent data", + setupBuffer: func() *buffer { + b := newBuffer(now-30, 10) + // Write data up to now (within MaxMissingDataPoints * frequency = 50 seconds) + for ts := now - 30; ts <= now; ts += 10 { + b.write(ts, schema.Float(1.0)) + } + return b + }, + wantExists: true, + wantHealthy: true, + description: "buffer with recent data should be healthy", + }, + { + name: "stale buffer beyond threshold", + setupBuffer: func() *buffer { + b := newBuffer(now-200, 10) + // Write data that ends 100 seconds ago (beyond MaxMissingDataPoints * frequency = 50 seconds) + for ts := now - 200; ts <= now-100; ts += 10 { + b.write(ts, schema.Float(1.0)) + } + return b + }, + wantExists: true, + wantHealthy: false, + description: "buffer with stale data should exist but not be healthy", + }, + { + name: "buffer at threshold boundary", + setupBuffer: func() *buffer { + b := newBuffer(now-50, 10) + // Write data that ends exactly at threshold (MaxMissingDataPoints * frequency = 50 seconds) + for ts := now - 50; ts <= now-50; ts += 10 { + b.write(ts, schema.Float(1.0)) + } + return b + }, + wantExists: true, + wantHealthy: true, + description: "buffer at threshold boundary should still be healthy", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + b := tt.setupBuffer() + + exists := b.bufferExists() + if exists != tt.wantExists { + t.Errorf("bufferExists() = %v, want %v: %s", exists, tt.wantExists, tt.description) + } + + if b != nil && b.data != nil && len(b.data) > 0 { + healthy := b.isBufferHealthy() + if healthy != tt.wantHealthy { + t.Errorf("isBufferHealthy() = %v, want %v: %s", healthy, tt.wantHealthy, tt.description) + } + } + }) + } +} From a7a95bb8668bc86e5fdaeb557325a92ef91d02e8 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Wed, 4 Feb 2026 12:57:48 +0100 Subject: [PATCH 16/17] add shortjobs and resource sums to project and user lists --- internal/repository/stats.go | 2 +- web/frontend/src/List.root.svelte | 91 +++++++++++++++++++++++++++++-- 2 files changed, 86 insertions(+), 7 deletions(-) diff --git a/internal/repository/stats.go b/internal/repository/stats.go index af764d46..942d6037 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -466,7 +466,7 @@ func (r *JobRepository) JobCountGrouped( // AddJobCountGrouped augments existing statistics with additional job counts by category. // // This method enriches JobsStatistics returned by JobsStatsGrouped or JobCountGrouped -// with counts of running or short-running jobs, matched by group ID. +// with counts of running or short-running (based on ShortRunningJobsDuration) jobs, matched by group ID. // // Parameters: // - ctx: Context for security checks diff --git a/web/frontend/src/List.root.svelte b/web/frontend/src/List.root.svelte index eb81b4c7..6bc1cd8f 100644 --- a/web/frontend/src/List.root.svelte +++ b/web/frontend/src/List.root.svelte @@ -7,7 +7,7 @@ --> @@ -66,11 +69,21 @@ - + + + {#if subClusters?.length > 1} + {#each subClusters.map(sc => sc.name) as scn} + + + + + + {/each} + {/if} diff --git a/web/frontend/src/status/dashdetails/UsageDash.svelte b/web/frontend/src/status/dashdetails/UsageDash.svelte index 928ef957..79adedc3 100644 --- a/web/frontend/src/status/dashdetails/UsageDash.svelte +++ b/web/frontend/src/status/dashdetails/UsageDash.svelte @@ -3,6 +3,9 @@ Properties: - `presetCluster String`: The cluster to show status information for + - `presetSubCluster String?`: The subCluster to show status information for [Default: null] + - `useCbColors Bool?`: Use colorblind friendly colors [Default: false] + - `useAltColors Bool?`: Use alternative color set [Default: false] -->