Prersist faulty nodestate metric lists to db

This commit is contained in:
2026-02-12 08:48:15 +01:00
parent 90c8fbf07c
commit 865cd3db54
6 changed files with 45 additions and 23 deletions

2
go.mod
View File

@@ -124,3 +124,5 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
sigs.k8s.io/yaml v1.6.0 // indirect
)
replace github.com/ClusterCockpit/cc-lib/v2 => ../cc-lib

6
go.sum
View File

@@ -4,12 +4,6 @@ github.com/99designs/gqlgen v0.17.85 h1:EkGx3U2FDcxQm8YDLQSpXIAVmpDyZ3IcBMOJi2nH
github.com/99designs/gqlgen v0.17.85/go.mod h1:yvs8s0bkQlRfqg03YXr3eR4OQUowVhODT/tHzCXnbOU=
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8=
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU=
github.com/ClusterCockpit/cc-lib/v2 v2.2.2 h1:ye4RY57I19c2cXr3XWZBS/QYYgQVeGFvsiu5HkyKq9E=
github.com/ClusterCockpit/cc-lib/v2 v2.2.2/go.mod h1:JuxMAuEOaLLNEnnL9U3ejha8kMvsSatLdKPZEgJw6iw=
github.com/ClusterCockpit/cc-lib/v2 v2.3.0 h1:69NqCAYCU1r2w6J5Yuxoe8jfR68VLqtWwsWXZ6KTOo4=
github.com/ClusterCockpit/cc-lib/v2 v2.3.0/go.mod h1:JuxMAuEOaLLNEnnL9U3ejha8kMvsSatLdKPZEgJw6iw=
github.com/ClusterCockpit/cc-lib/v2 v2.4.0 h1:OnZlvqSatg7yCQ2NtSR7AddpUVSiuSMZ8scF1a7nfOk=
github.com/ClusterCockpit/cc-lib/v2 v2.4.0/go.mod h1:JuxMAuEOaLLNEnnL9U3ejha8kMvsSatLdKPZEgJw6iw=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=

View File

@@ -80,7 +80,7 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
ms := metricstore.GetMemoryStore()
m := make(map[string][]string)
healthStates := make(map[string]schema.MonitoringState)
healthResults := make(map[string]metricstore.HealthCheckResult)
startMs := time.Now()
@@ -94,8 +94,8 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
if sc != "" {
metricList := archive.GetMetricConfigSubCluster(req.Cluster, sc)
metricNames := metricListToNames(metricList)
if states, err := ms.HealthCheck(req.Cluster, nl, metricNames); err == nil {
maps.Copy(healthStates, states)
if results, err := ms.HealthCheck(req.Cluster, nl, metricNames); err == nil {
maps.Copy(healthResults, results)
}
}
}
@@ -106,8 +106,10 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
for _, node := range req.Nodes {
state := determineState(node.States)
healthState := schema.MonitoringStateFailed
if hs, ok := healthStates[node.Hostname]; ok {
healthState = hs
var healthMetrics string
if result, ok := healthResults[node.Hostname]; ok {
healthState = result.State
healthMetrics = result.HealthMetrics
}
nodeState := schema.NodeStateDB{
TimeStamp: requestReceived,
@@ -116,10 +118,14 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
MemoryAllocated: node.MemoryAllocated,
GpusAllocated: node.GpusAllocated,
HealthState: healthState,
HealthMetrics: healthMetrics,
JobsRunning: node.JobsRunning,
}
repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState)
if err := repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState); err != nil {
cclog.Errorf("updateNodeStates: updating node state for %s on %s failed: %v",
node.Hostname, req.Cluster, err)
}
}
cclog.Debugf("Timer updateNodeStates, SQLite Inserts: %s", time.Since(startDB))

View File

@@ -169,9 +169,10 @@ func (r *NodeRepository) AddNode(node *schema.NodeDB) (int64, error) {
}
const NamedNodeStateInsert string = `
INSERT INTO node_state (time_stamp, node_state, health_state, cpus_allocated,
memory_allocated, gpus_allocated, jobs_running, node_id)
VALUES (:time_stamp, :node_state, :health_state, :cpus_allocated, :memory_allocated, :gpus_allocated, :jobs_running, :node_id);`
INSERT INTO node_state (time_stamp, node_state, health_state, health_metrics,
cpus_allocated, memory_allocated, gpus_allocated, jobs_running, node_id)
VALUES (:time_stamp, :node_state, :health_state, :health_metrics,
:cpus_allocated, :memory_allocated, :gpus_allocated, :jobs_running, :node_id);`
// TODO: Add real Monitoring Health State

View File

@@ -6,6 +6,7 @@
package metricstore
import (
"encoding/json"
"fmt"
"time"
@@ -19,6 +20,13 @@ type HealthCheckResponse struct {
Error error
}
// HealthCheckResult holds the monitoring state and raw JSON health metrics
// for a single node as determined by HealthCheck.
type HealthCheckResult struct {
State schema.MonitoringState
HealthMetrics string // JSON: {"missing":[...],"degraded":[...]}
}
// MaxMissingDataPoints is the threshold for stale data detection.
// A buffer is considered healthy if the gap between its last data point
// and the current time is within MaxMissingDataPoints * frequency.
@@ -134,15 +142,15 @@ func (m *MemoryStore) GetHealthyMetrics(selector []string, expectedMetrics []str
// - MonitoringStateFailed: node not found, or no healthy metrics at all
func (m *MemoryStore) HealthCheck(cluster string,
nodes []string, expectedMetrics []string,
) (map[string]schema.MonitoringState, error) {
results := make(map[string]schema.MonitoringState, len(nodes))
) (map[string]HealthCheckResult, error) {
results := make(map[string]HealthCheckResult, len(nodes))
for _, hostname := range nodes {
selector := []string{cluster, hostname}
degradedList, missingList, err := m.GetHealthyMetrics(selector, expectedMetrics)
if err != nil {
results[hostname] = schema.MonitoringStateFailed
results[hostname] = HealthCheckResult{State: schema.MonitoringStateFailed}
continue
}
@@ -158,13 +166,24 @@ func (m *MemoryStore) HealthCheck(cluster string,
cclog.ComponentInfo("metricstore", "HealthCheck: node ", hostname, "missing metrics:", missingList)
}
var state schema.MonitoringState
switch {
case degradedCount == 0 && missingCount == 0:
results[hostname] = schema.MonitoringStateFull
state = schema.MonitoringStateFull
case healthyCount == 0:
results[hostname] = schema.MonitoringStateFailed
state = schema.MonitoringStateFailed
default:
results[hostname] = schema.MonitoringStatePartial
state = schema.MonitoringStatePartial
}
hm, _ := json.Marshal(map[string][]string{
"missing": missingList,
"degraded": degradedList,
})
results[hostname] = HealthCheckResult{
State: state,
HealthMetrics: string(hm),
}
}

View File

@@ -253,8 +253,8 @@ func TestHealthCheck(t *testing.T) {
// Check status
if wantStatus, ok := tt.wantStates[node]; ok {
if state != wantStatus {
t.Errorf("HealthCheck() node %s status = %v, want %v", node, state, wantStatus)
if state.State != wantStatus {
t.Errorf("HealthCheck() node %s status = %v, want %v", node, state.State, wantStatus)
}
}
}