diff --git a/.gitignore b/.gitignore index db9f922b..67dbb510 100644 --- a/.gitignore +++ b/.gitignore @@ -13,7 +13,7 @@ /var/checkpoints* migrateTimestamps.pl -test_ccms_write_api* +test_ccms_* /web/frontend/public/build /web/frontend/node_modules diff --git a/internal/api/metricstore.go b/internal/api/metricstore.go index d36df4bf..5c15bb2c 100644 --- a/internal/api/metricstore.go +++ b/internal/api/metricstore.go @@ -135,36 +135,3 @@ func debugMetrics(rw http.ResponseWriter, r *http.Request) { return } } - -// handleHealthCheck godoc -// @summary HealthCheck endpoint -// @tags healthcheck -// @description This endpoint allows the users to check if a node is healthy -// @produce json -// @param selector query string false "Selector" -// @success 200 {string} string "Debug dump" -// @failure 400 {object} api.ErrorResponse "Bad Request" -// @failure 401 {object} api.ErrorResponse "Unauthorized" -// @failure 403 {object} api.ErrorResponse "Forbidden" -// @failure 500 {object} api.ErrorResponse "Internal Server Error" -// @security ApiKeyAuth -// @router /healthcheck/ [get] -func metricsHealth(rw http.ResponseWriter, r *http.Request) { - rawCluster := r.URL.Query().Get("cluster") - rawNode := r.URL.Query().Get("node") - - if rawCluster == "" || rawNode == "" { - handleError(errors.New("'cluster' and 'node' are required query parameter"), http.StatusBadRequest, rw) - return - } - - rw.Header().Add("Content-Type", "application/json") - - selector := []string{rawCluster, rawNode} - - ms := metricstore.GetMemoryStore() - if err := ms.HealthCheck(bufio.NewWriter(rw), selector); err != nil { - handleError(err, http.StatusBadRequest, rw) - return - } -} diff --git a/internal/api/nats.go b/internal/api/nats.go index bbbd151f..c0a8c174 100644 --- a/internal/api/nats.go +++ b/internal/api/nats.go @@ -324,11 +324,12 @@ func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) { } repo := repository.GetNodeRepository() + requestReceived := time.Now().Unix() for _, node := range req.Nodes { state := determineState(node.States) nodeState := schema.NodeStateDB{ - TimeStamp: time.Now().Unix(), + TimeStamp: requestReceived, NodeState: state, CpusAllocated: node.CpusAllocated, MemoryAllocated: node.MemoryAllocated, diff --git a/internal/api/node.go b/internal/api/node.go index 4ad5337a..37a8576c 100644 --- a/internal/api/node.go +++ b/internal/api/node.go @@ -7,11 +7,14 @@ package api import ( "fmt" + "maps" "net/http" "strings" "time" "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-backend/pkg/metricstore" "github.com/ClusterCockpit/cc-lib/v2/schema" ) @@ -20,6 +23,15 @@ type UpdateNodeStatesRequest struct { Cluster string `json:"cluster" example:"fritz"` } +// metricListToNames converts a map of metric configurations to a list of metric names +func metricListToNames(metricList map[string]*schema.Metric) []string { + names := make([]string, 0, len(metricList)) + for name := range metricList { + names = append(names, name) + } + return names +} + // this routine assumes that only one of them exists per node func determineState(states []string) schema.SchedulerState { for _, state := range states { @@ -62,16 +74,42 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { http.StatusBadRequest, rw) return } + requestReceived := time.Now().Unix() repo := repository.GetNodeRepository() + ms := metricstore.GetMemoryStore() + + m := make(map[string][]string) + healthStates := make(map[string]schema.MonitoringState) + + for _, node := range req.Nodes { + if sc, err := archive.GetSubClusterByNode(req.Cluster, node.Hostname); err == nil { + m[sc] = append(m[sc], node.Hostname) + } + } + + for sc, nl := range m { + if sc != "" { + metricList := archive.GetMetricConfigSubCluster(req.Cluster, sc) + metricNames := metricListToNames(metricList) + if states, err := ms.HealthCheck(req.Cluster, nl, metricNames); err == nil { + maps.Copy(healthStates, states) + } + } + } for _, node := range req.Nodes { state := determineState(node.States) + healthState := schema.MonitoringStateFailed + if hs, ok := healthStates[node.Hostname]; ok { + healthState = hs + } nodeState := schema.NodeStateDB{ - TimeStamp: time.Now().Unix(), NodeState: state, + TimeStamp: requestReceived, + NodeState: state, CpusAllocated: node.CpusAllocated, MemoryAllocated: node.MemoryAllocated, GpusAllocated: node.GpusAllocated, - HealthState: schema.MonitoringStateFull, + HealthState: healthState, JobsRunning: node.JobsRunning, } diff --git a/internal/api/rest.go b/internal/api/rest.go index 0d52742e..3f6d9609 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -81,7 +81,7 @@ func (api *RestAPI) MountAPIRoutes(r *mux.Router) { // Cluster List r.HandleFunc("/clusters/", api.getClusters).Methods(http.MethodGet) // Slurm node state - r.HandleFunc("/nodestate/", api.updateNodeStates).Methods(http.MethodPost, http.MethodPut) + r.HandleFunc("/nodestates/", api.updateNodeStates).Methods(http.MethodPost, http.MethodPut) // Job Handler if config.Keys.APISubjects == nil { cclog.Info("Enabling REST start/stop job API") @@ -127,12 +127,12 @@ func (api *RestAPI) MountMetricStoreAPIRoutes(r *mux.Router) { r.HandleFunc("/free", freeMetrics).Methods(http.MethodPost) r.HandleFunc("/write", writeMetrics).Methods(http.MethodPost) r.HandleFunc("/debug", debugMetrics).Methods(http.MethodGet) - r.HandleFunc("/healthcheck", metricsHealth).Methods(http.MethodGet) + r.HandleFunc("/healthcheck", api.updateNodeStates).Methods(http.MethodPost) // Same endpoints but with trailing slash r.HandleFunc("/free/", freeMetrics).Methods(http.MethodPost) r.HandleFunc("/write/", writeMetrics).Methods(http.MethodPost) r.HandleFunc("/debug/", debugMetrics).Methods(http.MethodGet) - r.HandleFunc("/healthcheck/", metricsHealth).Methods(http.MethodGet) + r.HandleFunc("/healthcheck/", api.updateNodeStates).Methods(http.MethodPost) } // MountConfigAPIRoutes registers configuration and user management endpoints. diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index d7d6b675..19d04eab 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -923,15 +923,19 @@ func (r *queryResolver) ClusterMetrics(ctx context.Context, cluster string, metr if !okData && len(ser.Data) != 0 { collectorData[metric] = make([]schema.Float, len(ser.Data)) } else if !okData { - cclog.Debugf("ClusterMetrics Skip Init: No Data -> %s at %s; Size %d", metric, ser.Hostname, len(ser.Data)) + cclog.Debugf("[SCHEMARESOLVER] clusterMetrics skip init: no data -> %s at %s; size %d", metric, ser.Hostname, len(ser.Data)) } // Sum if init'd and matching size if okData && len(ser.Data) == len(collectorData[metric]) { for i, val := range ser.Data { - collectorData[metric][i] += val + if val.IsNaN() { + continue + } else { + collectorData[metric][i] += val + } } } else if okData { - cclog.Debugf("ClusterMetrics Skip Sum: Data Diff -> %s at %s; Want Size %d, Have Size %d", metric, ser.Hostname, len(collectorData[metric]), len(ser.Data)) + cclog.Debugf("[SCHEMARESOLVER] clusterMetrics skip sum: data diff -> %s at %s; want size %d, have size %d", metric, ser.Hostname, len(collectorData[metric]), len(ser.Data)) } } } diff --git a/internal/repository/stats.go b/internal/repository/stats.go index af764d46..942d6037 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -466,7 +466,7 @@ func (r *JobRepository) JobCountGrouped( // AddJobCountGrouped augments existing statistics with additional job counts by category. // // This method enriches JobsStatistics returned by JobsStatsGrouped or JobCountGrouped -// with counts of running or short-running jobs, matched by group ID. +// with counts of running or short-running (based on ShortRunningJobsDuration) jobs, matched by group ID. // // Parameters: // - ctx: Context for security checks diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index cab4c24f..784348b5 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -158,8 +158,7 @@ func cleanupCheckpoints(dir string, cleanupDir string, from int64, deleteInstead return 0, err } - extension := Keys.Checkpoints.FileFormat - files, err := findFiles(entries, from, extension, false) + files, err := findFiles(entries, from, false) if err != nil { return 0, err } diff --git a/pkg/metricstore/checkpoint.go b/pkg/metricstore/checkpoint.go index 715566e4..b4097ff2 100644 --- a/pkg/metricstore/checkpoint.go +++ b/pkg/metricstore/checkpoint.go @@ -415,7 +415,7 @@ func enqueueCheckpointHosts(dir string, work chan<- [2]string) error { // // Uses worker pool to load cluster/host combinations. Periodically triggers GC // to prevent excessive heap growth. Returns number of files loaded and any errors. -func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) (int, error) { +func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) { var wg sync.WaitGroup work := make(chan [2]string, Keys.NumWorkers*4) n, errs := int32(0), int32(0) @@ -426,7 +426,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) ( defer wg.Done() for host := range work { lvl := m.root.findLevelOrCreate(host[:], len(m.Metrics)) - nn, err := lvl.fromCheckpoint(m, filepath.Join(dir, host[0], host[1]), from, extension) + nn, err := lvl.fromCheckpoint(m, filepath.Join(dir, host[0], host[1]), from) if err != nil { cclog.Errorf("[METRICSTORE]> error while loading checkpoints for %s/%s: %s", host[0], host[1], err.Error()) atomic.AddInt32(&errs, 1) @@ -465,57 +465,7 @@ func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) { cclog.Debugf("[METRICSTORE]> %#v Directory created successfully", dir) } - // Config read (replace with your actual config read) - fileFormat := Keys.Checkpoints.FileFormat - if fileFormat == "" { - fileFormat = "avro" - } - - // Map to easily get the fallback format - oppositeFormat := map[string]string{ - "json": "avro", - "avro": "json", - } - - // First, attempt to load the specified format - if found, err := checkFilesWithExtension(dir, fileFormat); err != nil { - return 0, fmt.Errorf("[METRICSTORE]> error checking files with extension: %v", err) - } else if found { - cclog.Infof("[METRICSTORE]> Loading %s files because fileformat is %s", fileFormat, fileFormat) - return m.FromCheckpoint(dir, from, fileFormat) - } - - // If not found, attempt the opposite format - altFormat := oppositeFormat[fileFormat] - if found, err := checkFilesWithExtension(dir, altFormat); err != nil { - return 0, fmt.Errorf("[METRICSTORE]> error checking files with extension: %v", err) - } else if found { - cclog.Infof("[METRICSTORE]> Loading %s files but fileformat is %s", altFormat, fileFormat) - return m.FromCheckpoint(dir, from, altFormat) - } - - return 0, nil -} - -// checkFilesWithExtension walks a directory tree to check if files with the given extension exist. -func checkFilesWithExtension(dir string, extension string) (bool, error) { - found := false - - err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return fmt.Errorf("[METRICSTORE]> error accessing path %s: %v", path, err) - } - if !info.IsDir() && filepath.Ext(info.Name()) == "."+extension { - found = true - return nil - } - return nil - }) - if err != nil { - return false, fmt.Errorf("[METRICSTORE]> error walking through directories: %s", err) - } - - return found, nil + return m.FromCheckpoint(dir, from) } func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error { @@ -729,7 +679,7 @@ func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error { return nil } -func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64, extension string) (int, error) { +func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64) (int, error) { direntries, err := os.ReadDir(dir) if err != nil { if os.IsNotExist(err) { @@ -748,33 +698,38 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64, extension children: make(map[string]*Level), } - files, err := child.fromCheckpoint(m, path.Join(dir, e.Name()), from, extension) + files, err := child.fromCheckpoint(m, path.Join(dir, e.Name()), from) filesLoaded += files if err != nil { return filesLoaded, err } l.children[e.Name()] = child - } else if strings.HasSuffix(e.Name(), "."+extension) { + } else if strings.HasSuffix(e.Name(), ".json") || strings.HasSuffix(e.Name(), ".avro") { allFiles = append(allFiles, e) } else { continue } } - files, err := findFiles(allFiles, from, extension, true) + files, err := findFiles(allFiles, from, true) if err != nil { return filesLoaded, err } loaders := map[string]func(*MemoryStore, *os.File, int64) error{ - "json": l.loadJSONFile, - "avro": l.loadAvroFile, + ".json": l.loadJSONFile, + ".avro": l.loadAvroFile, } - loader := loaders[extension] - for _, filename := range files { + ext := filepath.Ext(filename) + loader := loaders[ext] + if loader == nil { + cclog.Warnf("Unknown extension for file %s", filename) + continue + } + // Use a closure to ensure file is closed immediately after use err := func() error { f, err := os.Open(path.Join(dir, filename)) @@ -798,10 +753,12 @@ func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64, extension // This will probably get very slow over time! // A solution could be some sort of an index file in which all other files // and the timespan they contain is listed. -func findFiles(direntries []fs.DirEntry, t int64, extension string, findMoreRecentFiles bool) ([]string, error) { +// NOTE: This now assumes that you have distinct timestamps for json and avro files +// Also, it assumes that the timestamps are not overlapping/self-modified. +func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]string, error) { nums := map[string]int64{} for _, e := range direntries { - if !strings.HasSuffix(e.Name(), "."+extension) { + if !strings.HasSuffix(e.Name(), ".json") && !strings.HasSuffix(e.Name(), ".avro") { continue } diff --git a/pkg/metricstore/healthcheck.go b/pkg/metricstore/healthcheck.go index 2a49c47a..f390749d 100644 --- a/pkg/metricstore/healthcheck.go +++ b/pkg/metricstore/healthcheck.go @@ -6,87 +6,260 @@ package metricstore import ( - "bufio" + "cmp" "fmt" + "slices" "time" + + "github.com/ClusterCockpit/cc-lib/v2/schema" ) +// HealthCheckResponse represents the result of a health check operation. +// +// Status indicates the monitoring state (Full, Partial, Failed). +// Error contains any error encountered during the health check. +type HealthCheckResponse struct { + Status schema.MonitoringState + Error error +} + // MaxMissingDataPoints is a threshold that allows a node to be healthy with certain number of data points missing. // Suppose a node does not receive last 5 data points, then healthCheck endpoint will still say a // node is healthy. Anything more than 5 missing points in metrics of the node will deem the node unhealthy. const MaxMissingDataPoints int64 = 5 -// MaxUnhealthyMetrics is a threshold which allows upto certain number of metrics in a node to be unhealthly. -// Works with MaxMissingDataPoints. Say 5 metrics (including submetrics) do not receive the last -// MaxMissingDataPoints data points, then the node will be deemed healthy. Any more metrics that does -// not receive data for MaxMissingDataPoints data points will deem the node unhealthy. -const MaxUnhealthyMetrics int64 = 5 - -func (b *buffer) healthCheck() int64 { +// isBufferHealthy checks if a buffer has received data for the last MaxMissingDataPoints. +// +// Returns true if the buffer is healthy (recent data within threshold), false otherwise. +// A nil buffer or empty buffer is considered unhealthy. +func (b *buffer) bufferExists() bool { // Check if the buffer is empty - if b.data == nil { - return 1 + if b == nil || b.data == nil || len(b.data) == 0 { + return false } + return true +} + +// isBufferHealthy checks if a buffer has received data for the last MaxMissingDataPoints. +// +// Returns true if the buffer is healthy (recent data within threshold), false otherwise. +// A nil buffer or empty buffer is considered unhealthy. +func (b *buffer) isBufferHealthy() bool { + // Get the last endtime of the buffer bufferEnd := b.start + b.frequency*int64(len(b.data)) t := time.Now().Unix() - // Check if the buffer is too old + // Check if the buffer has recent data (within MaxMissingDataPoints threshold) if t-bufferEnd > MaxMissingDataPoints*b.frequency { - return 1 + return false } - return 0 + return true } -func (l *Level) healthCheck(m *MemoryStore, count int64) (int64, error) { +// MergeUniqueSorted merges two lists, sorts them, and removes duplicates. +// Requires 'cmp.Ordered' because we need to sort the data. +func mergeList[string cmp.Ordered](list1, list2 []string) []string { + // 1. Combine both lists + result := append(list1, list2...) + + // 2. Sort the combined list + slices.Sort(result) + + // 3. Compact removes consecutive duplicates (standard in Go 1.21+) + // e.g. [1, 1, 2, 3, 3] -> [1, 2, 3] + result = slices.Compact(result) + + return result +} + +// getHealthyMetrics recursively collects healthy and degraded metrics at this level and below. +// +// A metric is considered: +// - Healthy: buffer has recent data within MaxMissingDataPoints threshold AND has few/no NaN values +// - Degraded: buffer exists and has recent data, but contains more than MaxMissingDataPoints NaN values +// +// This routine walks the entire subtree starting from the current level. +// +// Parameters: +// - m: MemoryStore containing the global metric configuration +// +// Returns: +// - []string: Flat list of healthy metric names from this level and all children +// - []string: Flat list of degraded metric names (exist but have too many missing values) +// - error: Non-nil only for internal errors during recursion +// +// The routine mirrors healthCheck() but provides more granular classification: +// - healthCheck() finds problems (stale/missing) +// - getHealthyMetrics() separates healthy from degraded metrics +func (l *Level) getHealthyMetrics(m *MemoryStore, expectedMetrics []string) ([]string, []string, error) { l.lock.RLock() defer l.lock.RUnlock() - for _, mc := range m.Metrics { - if b := l.metrics[mc.offset]; b != nil { - count += b.healthCheck() + globalMetrics := m.Metrics + + missingList := make([]string, 0) + degradedList := make([]string, 0) + + // Phase 1: Check metrics at this level + for _, metricName := range expectedMetrics { + offset := globalMetrics[metricName].offset + b := l.metrics[offset] + + if !b.bufferExists() { + missingList = append(missingList, metricName) + } else if !b.isBufferHealthy() { + degradedList = append(degradedList, metricName) } } + // Phase 2: Recursively check child levels for _, lvl := range l.children { - c, err := lvl.healthCheck(m, 0) + childMissing, childDegraded, err := lvl.getHealthyMetrics(m, expectedMetrics) if err != nil { - return 0, err + return nil, nil, err } - count += c + + missingList = mergeList(missingList, childMissing) + degradedList = mergeList(degradedList, childDegraded) } - return count, nil + return missingList, degradedList, nil } -func (m *MemoryStore) HealthCheck(w *bufio.Writer, selector []string) error { +// GetHealthyMetrics returns healthy and degraded metrics for a specific node as flat lists. +// +// This routine walks the metric tree starting from the specified node selector +// and collects all metrics that have received data within the last MaxMissingDataPoints +// (default: 5 data points). Metrics are classified into two categories: +// +// - Healthy: Buffer has recent data AND contains few/no NaN (missing) values +// - Degraded: Buffer has recent data BUT contains more than MaxMissingDataPoints NaN values +// +// The returned lists include both node-level metrics (e.g., "load", "mem_used") and +// hardware-level metrics (e.g., "cpu_user", "gpu_temp") in flat slices. +// +// Parameters: +// - selector: Hierarchical path to the target node, typically []string{cluster, hostname}. +// Example: []string{"emmy", "node001"} navigates to the "node001" host in the "emmy" cluster. +// The selector must match the hierarchy used during metric ingestion. +// +// Returns: +// - []string: Flat list of healthy metric names (recent data, few missing values) +// - []string: Flat list of degraded metric names (recent data, many missing values) +// - error: Non-nil if the node is not found or internal errors occur +// +// Example usage: +// +// selector := []string{"emmy", "node001"} +// healthyMetrics, degradedMetrics, err := ms.GetHealthyMetrics(selector) +// if err != nil { +// // Node not found or internal error +// return err +// } +// fmt.Printf("Healthy metrics: %v\n", healthyMetrics) +// // Output: ["load", "mem_used", "cpu_user", ...] +// fmt.Printf("Degraded metrics: %v\n", degradedMetrics) +// // Output: ["gpu_temp", "network_rx", ...] (metrics with many NaN values) +// +// Note: This routine provides more granular classification than HealthCheck: +// - HealthCheck reports stale/missing metrics (problems) +// - GetHealthyMetrics separates fully healthy from degraded metrics (quality levels) +func (m *MemoryStore) GetHealthyMetrics(selector []string, expectedMetrics []string) ([]string, []string, error) { lvl := m.root.findLevel(selector) if lvl == nil { - return fmt.Errorf("[METRICSTORE]> not found: %#v", selector) + return nil, nil, fmt.Errorf("[METRICSTORE]> error while GetHealthyMetrics, host not found: %#v", selector) } - buf := make([]byte, 0, 25) - // buf = append(buf, "{"...) - - var count int64 = 0 - - unhealthyMetricsCount, err := lvl.healthCheck(m, count) + missingList, degradedList, err := lvl.getHealthyMetrics(m, expectedMetrics) if err != nil { - return err + return nil, nil, err } - if unhealthyMetricsCount < MaxUnhealthyMetrics { - buf = append(buf, "Healthy"...) - } else { - buf = append(buf, "Unhealthy"...) - } - - // buf = append(buf, "}\n"...) - - if _, err = w.Write(buf); err != nil { - return err - } - - return w.Flush() + return missingList, degradedList, nil +} + +// HealthCheck performs health checks on multiple nodes and returns their monitoring states. +// +// This routine provides a batch health check interface that evaluates multiple nodes +// against a specific set of expected metrics. For each node, it determines the overall +// monitoring state based on which metrics are healthy, degraded, or missing. +// +// Health Status Classification: +// - MonitoringStateFull: All expected metrics are healthy (recent data, few missing values) +// - MonitoringStatePartial: Some metrics are degraded (many missing values) or missing +// - MonitoringStateFailed: Node not found or all expected metrics are missing/stale +// +// Parameters: +// - cluster: Cluster name (first element of selector path) +// - nodes: List of node hostnames to check +// - expectedMetrics: List of metric names that should be present on each node +// +// Returns: +// - map[string]schema.MonitoringState: Map keyed by hostname containing monitoring state for each node +// - error: Non-nil only for internal errors (individual node failures are captured as MonitoringStateFailed) +// +// Example usage: +// +// cluster := "emmy" +// nodes := []string{"node001", "node002", "node003"} +// expectedMetrics := []string{"load", "mem_used", "cpu_user", "cpu_system"} +// healthStates, err := ms.HealthCheck(cluster, nodes, expectedMetrics) +// if err != nil { +// return err +// } +// for hostname, state := range healthStates { +// fmt.Printf("Node %s: %s\n", hostname, state) +// } +// +// Note: This routine is optimized for batch operations where you need to check +// the same set of metrics across multiple nodes. +func (m *MemoryStore) HealthCheck(cluster string, + nodes []string, expectedMetrics []string, +) (map[string]schema.MonitoringState, error) { + results := make(map[string]schema.MonitoringState, len(nodes)) + + // Create a set of expected metrics for fast lookup + expectedSet := make(map[string]bool, len(expectedMetrics)) + for _, metric := range expectedMetrics { + expectedSet[metric] = true + } + + // Check each node + for _, hostname := range nodes { + selector := []string{cluster, hostname} + status := schema.MonitoringStateFull + healthyCount := 0 + degradedCount := 0 + missingCount := 0 + + // Get healthy and degraded metrics for this node + missingList, degradedList, err := m.GetHealthyMetrics(selector, expectedMetrics) + if err != nil { + // Node not found or internal error + results[hostname] = schema.MonitoringStateFailed + continue + } + + missingCount = len(missingList) + degradedCount = len(degradedList) + healthyCount = len(expectedMetrics) - (missingCount + degradedCount) + + // Determine overall health status + if missingCount > 0 || degradedCount > 0 { + if healthyCount == 0 { + // No healthy metrics at all + status = schema.MonitoringStateFailed + } else { + // Some healthy, some degraded/missing + status = schema.MonitoringStatePartial + } + } + // else: all metrics healthy, status remains MonitoringStateFull + + results[hostname] = status + } + + return results, nil } diff --git a/pkg/metricstore/metricstore_test.go b/pkg/metricstore/metricstore_test.go index 90cec2bd..f96f49a2 100644 --- a/pkg/metricstore/metricstore_test.go +++ b/pkg/metricstore/metricstore_test.go @@ -7,6 +7,7 @@ package metricstore import ( "testing" + "time" "github.com/ClusterCockpit/cc-lib/v2/schema" ) @@ -88,3 +89,378 @@ func TestBufferRead(t *testing.T) { t.Errorf("buffer.read() len(result) = %d, want 3", len(result)) } } + +func TestHealthCheck(t *testing.T) { + // Create a test MemoryStore with some metrics + metrics := map[string]MetricConfig{ + "load": {Frequency: 10, Aggregation: AvgAggregation, offset: 0}, + "mem_used": {Frequency: 10, Aggregation: AvgAggregation, offset: 1}, + "cpu_user": {Frequency: 10, Aggregation: AvgAggregation, offset: 2}, + "cpu_system": {Frequency: 10, Aggregation: AvgAggregation, offset: 3}, + } + + ms := &MemoryStore{ + Metrics: metrics, + root: Level{ + metrics: make([]*buffer, len(metrics)), + children: make(map[string]*Level), + }, + } + + // Use recent timestamps (current time minus a small offset) + now := time.Now().Unix() + startTime := now - 100 // Start 100 seconds ago to have enough data points + + // Setup test data for node001 - all metrics healthy (recent data) + node001 := ms.root.findLevelOrCreate([]string{"testcluster", "node001"}, len(metrics)) + for i := 0; i < len(metrics); i++ { + node001.metrics[i] = newBuffer(startTime, 10) + // Write recent data up to now + for ts := startTime; ts <= now; ts += 10 { + node001.metrics[i].write(ts, schema.Float(float64(i+1))) + } + } + + // Setup test data for node002 - some metrics stale (old data beyond MaxMissingDataPoints threshold) + node002 := ms.root.findLevelOrCreate([]string{"testcluster", "node002"}, len(metrics)) + // MaxMissingDataPoints = 5, frequency = 10, so threshold is 50 seconds + staleTime := now - 100 // Data ends 100 seconds ago (well beyond 50 second threshold) + for i := 0; i < len(metrics); i++ { + node002.metrics[i] = newBuffer(staleTime-50, 10) + if i < 2 { + // First two metrics: healthy (recent data) + for ts := startTime; ts <= now; ts += 10 { + node002.metrics[i].write(ts, schema.Float(float64(i+1))) + } + } else { + // Last two metrics: stale (data ends 100 seconds ago) + for ts := staleTime - 50; ts <= staleTime; ts += 10 { + node002.metrics[i].write(ts, schema.Float(float64(i+1))) + } + } + } + + // Setup test data for node003 - some metrics missing (no buffer) + node003 := ms.root.findLevelOrCreate([]string{"testcluster", "node003"}, len(metrics)) + // Only create buffers for first two metrics + for i := 0; i < 2; i++ { + node003.metrics[i] = newBuffer(startTime, 10) + for ts := startTime; ts <= now; ts += 10 { + node003.metrics[i].write(ts, schema.Float(float64(i+1))) + } + } + // Leave metrics[2] and metrics[3] as nil (missing) + + // Setup test data for node005 - all metrics stale + node005 := ms.root.findLevelOrCreate([]string{"testcluster", "node005"}, len(metrics)) + for i := 0; i < len(metrics); i++ { + node005.metrics[i] = newBuffer(staleTime-50, 10) + // All metrics have stale data (ends 100 seconds ago) + for ts := staleTime - 50; ts <= staleTime; ts += 10 { + node005.metrics[i].write(ts, schema.Float(float64(i+1))) + } + } + + // node004 doesn't exist at all + + tests := []struct { + name string + cluster string + nodes []string + expectedMetrics []string + wantStates map[string]schema.MonitoringState + }{ + { + name: "all metrics healthy", + cluster: "testcluster", + nodes: []string{"node001"}, + expectedMetrics: []string{"load", "mem_used", "cpu_user", "cpu_system"}, + wantStates: map[string]schema.MonitoringState{ + "node001": schema.MonitoringStateFull, + }, + }, + { + name: "some metrics stale", + cluster: "testcluster", + nodes: []string{"node002"}, + expectedMetrics: []string{"load", "mem_used", "cpu_user", "cpu_system"}, + wantStates: map[string]schema.MonitoringState{ + "node002": schema.MonitoringStatePartial, + }, + }, + { + name: "some metrics missing", + cluster: "testcluster", + nodes: []string{"node003"}, + expectedMetrics: []string{"load", "mem_used", "cpu_user", "cpu_system"}, + wantStates: map[string]schema.MonitoringState{ + "node003": schema.MonitoringStatePartial, + }, + }, + { + name: "node not found", + cluster: "testcluster", + nodes: []string{"node004"}, + expectedMetrics: []string{"load", "mem_used", "cpu_user", "cpu_system"}, + wantStates: map[string]schema.MonitoringState{ + "node004": schema.MonitoringStateFailed, + }, + }, + { + name: "all metrics stale", + cluster: "testcluster", + nodes: []string{"node005"}, + expectedMetrics: []string{"load", "mem_used", "cpu_user", "cpu_system"}, + wantStates: map[string]schema.MonitoringState{ + "node005": schema.MonitoringStateFailed, + }, + }, + { + name: "multiple nodes mixed states", + cluster: "testcluster", + nodes: []string{"node001", "node002", "node003", "node004", "node005"}, + expectedMetrics: []string{"load", "mem_used"}, + wantStates: map[string]schema.MonitoringState{ + "node001": schema.MonitoringStateFull, + "node002": schema.MonitoringStateFull, // Only checking first 2 metrics which are healthy + "node003": schema.MonitoringStateFull, // Only checking first 2 metrics which exist + "node004": schema.MonitoringStateFailed, // Node doesn't exist + "node005": schema.MonitoringStateFailed, // Both metrics are stale + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + results, err := ms.HealthCheck(tt.cluster, tt.nodes, tt.expectedMetrics) + if err != nil { + t.Errorf("HealthCheck() error = %v", err) + return + } + + // Check that we got results for all nodes + if len(results) != len(tt.nodes) { + t.Errorf("HealthCheck() returned %d results, want %d", len(results), len(tt.nodes)) + } + + // Check each node's state + for _, node := range tt.nodes { + state, ok := results[node] + if !ok { + t.Errorf("HealthCheck() missing result for node %s", node) + continue + } + + // Check status + if wantStatus, ok := tt.wantStates[node]; ok { + if state != wantStatus { + t.Errorf("HealthCheck() node %s status = %v, want %v", node, state, wantStatus) + } + } + } + }) + } +} + +// TestGetHealthyMetrics tests the GetHealthyMetrics function which returns lists of missing and degraded metrics +func TestGetHealthyMetrics(t *testing.T) { + metrics := map[string]MetricConfig{ + "load": {Frequency: 10, Aggregation: AvgAggregation, offset: 0}, + "mem_used": {Frequency: 10, Aggregation: AvgAggregation, offset: 1}, + "cpu_user": {Frequency: 10, Aggregation: AvgAggregation, offset: 2}, + } + + ms := &MemoryStore{ + Metrics: metrics, + root: Level{ + metrics: make([]*buffer, len(metrics)), + children: make(map[string]*Level), + }, + } + + now := time.Now().Unix() + startTime := now - 100 + staleTime := now - 100 + + // Setup node with mixed health states + node := ms.root.findLevelOrCreate([]string{"testcluster", "testnode"}, len(metrics)) + + // Metric 0 (load): healthy - recent data + node.metrics[0] = newBuffer(startTime, 10) + for ts := startTime; ts <= now; ts += 10 { + node.metrics[0].write(ts, schema.Float(1.0)) + } + + // Metric 1 (mem_used): degraded - stale data + node.metrics[1] = newBuffer(staleTime-50, 10) + for ts := staleTime - 50; ts <= staleTime; ts += 10 { + node.metrics[1].write(ts, schema.Float(2.0)) + } + + // Metric 2 (cpu_user): missing - no buffer (nil) + + tests := []struct { + name string + selector []string + expectedMetrics []string + wantMissing []string + wantDegraded []string + wantErr bool + }{ + { + name: "mixed health states", + selector: []string{"testcluster", "testnode"}, + expectedMetrics: []string{"load", "mem_used", "cpu_user"}, + wantMissing: []string{"cpu_user"}, + wantDegraded: []string{"mem_used"}, + wantErr: false, + }, + { + name: "node not found", + selector: []string{"testcluster", "nonexistent"}, + expectedMetrics: []string{"load"}, + wantMissing: nil, + wantDegraded: nil, + wantErr: true, + }, + { + name: "check only healthy metric", + selector: []string{"testcluster", "testnode"}, + expectedMetrics: []string{"load"}, + wantMissing: []string{}, + wantDegraded: []string{}, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + missing, degraded, err := ms.GetHealthyMetrics(tt.selector, tt.expectedMetrics) + + if (err != nil) != tt.wantErr { + t.Errorf("GetHealthyMetrics() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if tt.wantErr { + return + } + + // Check missing list + if len(missing) != len(tt.wantMissing) { + t.Errorf("GetHealthyMetrics() missing = %v, want %v", missing, tt.wantMissing) + } else { + for i, m := range tt.wantMissing { + if missing[i] != m { + t.Errorf("GetHealthyMetrics() missing[%d] = %v, want %v", i, missing[i], m) + } + } + } + + // Check degraded list + if len(degraded) != len(tt.wantDegraded) { + t.Errorf("GetHealthyMetrics() degraded = %v, want %v", degraded, tt.wantDegraded) + } else { + for i, d := range tt.wantDegraded { + if degraded[i] != d { + t.Errorf("GetHealthyMetrics() degraded[%d] = %v, want %v", i, degraded[i], d) + } + } + } + }) + } +} + +// TestBufferHealthChecks tests the buffer-level health check functions +func TestBufferHealthChecks(t *testing.T) { + now := time.Now().Unix() + + tests := []struct { + name string + setupBuffer func() *buffer + wantExists bool + wantHealthy bool + description string + }{ + { + name: "nil buffer", + setupBuffer: func() *buffer { + return nil + }, + wantExists: false, + wantHealthy: false, + description: "nil buffer should not exist and not be healthy", + }, + { + name: "empty buffer", + setupBuffer: func() *buffer { + b := newBuffer(now, 10) + b.data = nil + return b + }, + wantExists: false, + wantHealthy: false, + description: "empty buffer should not exist and not be healthy", + }, + { + name: "healthy buffer with recent data", + setupBuffer: func() *buffer { + b := newBuffer(now-30, 10) + // Write data up to now (within MaxMissingDataPoints * frequency = 50 seconds) + for ts := now - 30; ts <= now; ts += 10 { + b.write(ts, schema.Float(1.0)) + } + return b + }, + wantExists: true, + wantHealthy: true, + description: "buffer with recent data should be healthy", + }, + { + name: "stale buffer beyond threshold", + setupBuffer: func() *buffer { + b := newBuffer(now-200, 10) + // Write data that ends 100 seconds ago (beyond MaxMissingDataPoints * frequency = 50 seconds) + for ts := now - 200; ts <= now-100; ts += 10 { + b.write(ts, schema.Float(1.0)) + } + return b + }, + wantExists: true, + wantHealthy: false, + description: "buffer with stale data should exist but not be healthy", + }, + { + name: "buffer at threshold boundary", + setupBuffer: func() *buffer { + b := newBuffer(now-50, 10) + // Write data that ends exactly at threshold (MaxMissingDataPoints * frequency = 50 seconds) + for ts := now - 50; ts <= now-50; ts += 10 { + b.write(ts, schema.Float(1.0)) + } + return b + }, + wantExists: true, + wantHealthy: true, + description: "buffer at threshold boundary should still be healthy", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + b := tt.setupBuffer() + + exists := b.bufferExists() + if exists != tt.wantExists { + t.Errorf("bufferExists() = %v, want %v: %s", exists, tt.wantExists, tt.description) + } + + if b != nil && b.data != nil && len(b.data) > 0 { + healthy := b.isBufferHealthy() + if healthy != tt.wantHealthy { + t.Errorf("isBufferHealthy() = %v, want %v: %s", healthy, tt.wantHealthy, tt.description) + } + } + }) + } +} diff --git a/web/frontend/src/List.root.svelte b/web/frontend/src/List.root.svelte index 108c42dd..6bc1cd8f 100644 --- a/web/frontend/src/List.root.svelte +++ b/web/frontend/src/List.root.svelte @@ -7,7 +7,7 @@ --> - + {#if displayTitle} diff --git a/web/frontend/src/generic/joblist/JobListRow.svelte b/web/frontend/src/generic/joblist/JobListRow.svelte index 17a160e1..9502a2f8 100644 --- a/web/frontend/src/generic/joblist/JobListRow.svelte +++ b/web/frontend/src/generic/joblist/JobListRow.svelte @@ -79,6 +79,7 @@ /* Derived */ const jobId = $derived(job?.id); + const refinedData = $derived($metricsQuery?.data?.jobMetrics ? sortAndSelectScope($metricsQuery.data.jobMetrics) : []); const scopes = $derived.by(() => { if (job.numNodes == 1) { if (job.numAcc >= 1) return ["core", "accelerator"]; @@ -202,40 +203,45 @@ /> {/if} - {#each sortAndSelectScope($metricsQuery.data.jobMetrics) as metric, i (metric?.name || i)} + {#each refinedData as metric, i (metric?.name || i)} - - {#if metric.disabled == false && metric.data} - handleZoom(detail, metric.data.name)} - height={plotHeight} - timestep={metric.data.metric.timestep} - scope={metric.data.scope} - series={metric.data.metric.series} - statisticsSeries={metric.data.metric.statisticsSeries} - metric={metric.data.name} - cluster={cluster.find((c) => c.name == job.cluster)} - subCluster={job.subCluster} - isShared={job.shared != "none"} - numhwthreads={job.numHWThreads} - numaccs={job.numAcc} - zoomState={zoomStates[metric.data.name] || null} - thresholdState={thresholdStates[metric.data.name] || null} - /> - {:else if metric.disabled == true && metric.data} - Metric disabled for subcluster {metric.data.name}:{job.subCluster} - {:else} - -

No dataset(s) returned for {metrics[i]}

-

Metric or host was not found in metric store for cluster {job.cluster}:

-

Identical messages in {metrics[i]} column: Metric not found.

-

Identical messages in job {job.jobId} row: Host not found.

-
- {/if} + {#key metric} + {#if metric?.data} + {#if metric?.disabled} + + Metric {metric.data.name}: Disabled for subcluster {job.subCluster} + + {:else} + handleZoom(detail, metric.data.name)} + height={plotHeight} + timestep={metric.data.metric.timestep} + scope={metric.data.scope} + series={metric.data.metric.series} + statisticsSeries={metric.data.metric.statisticsSeries} + metric={metric.data.name} + cluster={cluster.find((c) => c.name == job.cluster)} + subCluster={job.subCluster} + isShared={job.shared != "none"} + numhwthreads={job.numHWThreads} + numaccs={job.numAcc} + zoomState={zoomStates[metric.data.name] || null} + thresholdState={thresholdStates[metric.data.name] || null} + /> + {/if} + {:else} + +

No dataset(s) returned for {metrics[i]}

+

Metric or host was not found in metric store for cluster {job.cluster}:

+

Identical messages in {metrics[i]} column: Metric not found.

+

Identical messages in job {job.jobId} row: Host not found.

+
+ {/if} + {/key} + + {:else} + + No metrics selected for display. {/each} {/if} diff --git a/web/frontend/src/generic/plots/DoubleMetricPlot.svelte b/web/frontend/src/generic/plots/DoubleMetricPlot.svelte index 10e01311..f3cea881 100644 --- a/web/frontend/src/generic/plots/DoubleMetricPlot.svelte +++ b/web/frontend/src/generic/plots/DoubleMetricPlot.svelte @@ -79,7 +79,7 @@ // X let pendingSeries = [ { - label: "Runtime", + label: "Time", value: (u, ts, sidx, didx) => (didx == null) ? null : formatDurationTime(ts, forNode), } diff --git a/web/frontend/src/status/DashDetails.svelte b/web/frontend/src/status/DashDetails.svelte index 410d8df4..9b6dda56 100644 --- a/web/frontend/src/status/DashDetails.svelte +++ b/web/frontend/src/status/DashDetails.svelte @@ -34,6 +34,9 @@ /*Const Init */ const { query: initq } = init(); const useCbColors = getContext("cc-config")?.plotConfiguration_colorblindMode || false + + /* Derived */ + const subClusters = $derived($initq?.data?.clusters?.find((c) => c.name == presetCluster)?.subClusters || []); @@ -66,11 +69,21 @@ - + + + {#if subClusters?.length > 1} + {#each subClusters.map(sc => sc.name) as scn} + + + + + + {/each} + {/if} diff --git a/web/frontend/src/status/dashdetails/UsageDash.svelte b/web/frontend/src/status/dashdetails/UsageDash.svelte index 928ef957..79adedc3 100644 --- a/web/frontend/src/status/dashdetails/UsageDash.svelte +++ b/web/frontend/src/status/dashdetails/UsageDash.svelte @@ -3,6 +3,9 @@ Properties: - `presetCluster String`: The cluster to show status information for + - `presetSubCluster String?`: The subCluster to show status information for [Default: null] + - `useCbColors Bool?`: Use colorblind friendly colors [Default: false] + - `useAltColors Bool?`: Use alternative color set [Default: false] -->