diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index 3c70a960..3ee05383 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -248,7 +248,7 @@ func generateJWT(authHandle *auth.Authentication, username string) error { return fmt.Errorf("getting user '%s': %w", username, err) } - if !user.HasRole(schema.RoleApi) { + if !user.HasRole(schema.RoleAPI) { cclog.Warnf("JWT: User '%s' does not have the role 'api'. REST API endpoints will return error!\n", user.Username) } diff --git a/go.mod b/go.mod index 6bcc3b08..f9bf7e42 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ tool ( require ( github.com/99designs/gqlgen v0.17.85 - github.com/ClusterCockpit/cc-lib/v2 v2.2.2 + github.com/ClusterCockpit/cc-lib/v2 v2.4.0 github.com/Masterminds/squirrel v1.5.4 github.com/aws/aws-sdk-go-v2 v1.41.1 github.com/aws/aws-sdk-go-v2/config v1.32.6 diff --git a/go.sum b/go.sum index f4f41dfd..509c659c 100644 --- a/go.sum +++ b/go.sum @@ -4,10 +4,8 @@ github.com/99designs/gqlgen v0.17.85 h1:EkGx3U2FDcxQm8YDLQSpXIAVmpDyZ3IcBMOJi2nH github.com/99designs/gqlgen v0.17.85/go.mod h1:yvs8s0bkQlRfqg03YXr3eR4OQUowVhODT/tHzCXnbOU= github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8= github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU= -github.com/ClusterCockpit/cc-lib/v2 v2.2.1 h1:iCVas+Jc61zFH5S2VG3H1sc7tsn+U4lOJwUYjYZEims= -github.com/ClusterCockpit/cc-lib/v2 v2.2.1/go.mod h1:JuxMAuEOaLLNEnnL9U3ejha8kMvsSatLdKPZEgJw6iw= -github.com/ClusterCockpit/cc-lib/v2 v2.2.2 h1:ye4RY57I19c2cXr3XWZBS/QYYgQVeGFvsiu5HkyKq9E= -github.com/ClusterCockpit/cc-lib/v2 v2.2.2/go.mod h1:JuxMAuEOaLLNEnnL9U3ejha8kMvsSatLdKPZEgJw6iw= +github.com/ClusterCockpit/cc-lib/v2 v2.4.0 h1:OnZlvqSatg7yCQ2NtSR7AddpUVSiuSMZ8scF1a7nfOk= +github.com/ClusterCockpit/cc-lib/v2 v2.4.0/go.mod h1:JuxMAuEOaLLNEnnL9U3ejha8kMvsSatLdKPZEgJw6iw= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= diff --git a/internal/api/cluster.go b/internal/api/cluster.go index d1c3c898..5e6e3a27 100644 --- a/internal/api/cluster.go +++ b/internal/api/cluster.go @@ -36,9 +36,9 @@ type GetClustersAPIResponse struct { // @router /api/clusters/ [get] func (api *RestAPI) getClusters(rw http.ResponseWriter, r *http.Request) { if user := repository.GetUserFromContext(r.Context()); user != nil && - !user.HasRole(schema.RoleApi) { + !user.HasRole(schema.RoleAPI) { - handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleApi)), http.StatusForbidden, rw) + handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleAPI)), http.StatusForbidden, rw) return } diff --git a/internal/api/job.go b/internal/api/job.go index 9bd93b1c..66258668 100644 --- a/internal/api/job.go +++ b/internal/api/job.go @@ -1054,8 +1054,8 @@ type GetUsedNodesAPIResponse struct { // @router /api/jobs/used_nodes [get] func (api *RestAPI) getUsedNodes(rw http.ResponseWriter, r *http.Request) { if user := repository.GetUserFromContext(r.Context()); user != nil && - !user.HasRole(schema.RoleApi) { - handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleApi)), http.StatusForbidden, rw) + !user.HasRole(schema.RoleAPI) { + handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleAPI)), http.StatusForbidden, rw) return } diff --git a/internal/api/node.go b/internal/api/node.go index 27cde7f0..e6b19479 100644 --- a/internal/api/node.go +++ b/internal/api/node.go @@ -80,7 +80,7 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { ms := metricstore.GetMemoryStore() m := make(map[string][]string) - healthStates := make(map[string]schema.MonitoringState) + healthResults := make(map[string]metricstore.HealthCheckResult) startMs := time.Now() @@ -94,8 +94,8 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { if sc != "" { metricList := archive.GetMetricConfigSubCluster(req.Cluster, sc) metricNames := metricListToNames(metricList) - if states, err := ms.HealthCheck(req.Cluster, nl, metricNames); err == nil { - maps.Copy(healthStates, states) + if results, err := ms.HealthCheck(req.Cluster, nl, metricNames); err == nil { + maps.Copy(healthResults, results) } } } @@ -106,8 +106,10 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { for _, node := range req.Nodes { state := determineState(node.States) healthState := schema.MonitoringStateFailed - if hs, ok := healthStates[node.Hostname]; ok { - healthState = hs + var healthMetrics string + if result, ok := healthResults[node.Hostname]; ok { + healthState = result.State + healthMetrics = result.HealthMetrics } nodeState := schema.NodeStateDB{ TimeStamp: requestReceived, @@ -116,10 +118,14 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { MemoryAllocated: node.MemoryAllocated, GpusAllocated: node.GpusAllocated, HealthState: healthState, + HealthMetrics: healthMetrics, JobsRunning: node.JobsRunning, } - repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState) + if err := repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState); err != nil { + cclog.Errorf("updateNodeStates: updating node state for %s on %s failed: %v", + node.Hostname, req.Cluster, err) + } } cclog.Debugf("Timer updateNodeStates, SQLite Inserts: %s", time.Since(startDB)) diff --git a/internal/api/user.go b/internal/api/user.go index 5eba0dfc..e2f78165 100644 --- a/internal/api/user.go +++ b/internal/api/user.go @@ -164,7 +164,7 @@ func (api *RestAPI) createUser(rw http.ResponseWriter, r *http.Request) { return } - if len(password) == 0 && role != schema.GetRoleString(schema.RoleApi) { + if len(password) == 0 && role != schema.GetRoleString(schema.RoleAPI) { handleError(fmt.Errorf("only API users are allowed to have a blank password (login will be impossible)"), http.StatusBadRequest, rw) return } diff --git a/internal/auth/auth.go b/internal/auth/auth.go index 8a2073b5..9b1e2121 100644 --- a/internal/auth/auth.go +++ b/internal/auth/auth.go @@ -448,13 +448,13 @@ func (auth *Authentication) AuthAPI( if user != nil { switch { case len(user.Roles) == 1: - if user.HasRole(schema.RoleApi) { + if user.HasRole(schema.RoleAPI) { ctx := context.WithValue(r.Context(), repository.ContextUserKey, user) onsuccess.ServeHTTP(rw, r.WithContext(ctx)) return } case len(user.Roles) >= 2: - if user.HasAllRoles([]schema.Role{schema.RoleAdmin, schema.RoleApi}) { + if user.HasAllRoles([]schema.Role{schema.RoleAdmin, schema.RoleAPI}) { ctx := context.WithValue(r.Context(), repository.ContextUserKey, user) onsuccess.ServeHTTP(rw, r.WithContext(ctx)) return @@ -484,13 +484,13 @@ func (auth *Authentication) AuthUserAPI( if user != nil { switch { case len(user.Roles) == 1: - if user.HasRole(schema.RoleApi) { + if user.HasRole(schema.RoleAPI) { ctx := context.WithValue(r.Context(), repository.ContextUserKey, user) onsuccess.ServeHTTP(rw, r.WithContext(ctx)) return } case len(user.Roles) >= 2: - if user.HasRole(schema.RoleApi) && user.HasAnyRole([]schema.Role{schema.RoleUser, schema.RoleManager, schema.RoleSupport, schema.RoleAdmin}) { + if user.HasRole(schema.RoleAPI) && user.HasAnyRole([]schema.Role{schema.RoleUser, schema.RoleManager, schema.RoleSupport, schema.RoleAdmin}) { ctx := context.WithValue(r.Context(), repository.ContextUserKey, user) onsuccess.ServeHTTP(rw, r.WithContext(ctx)) return @@ -520,13 +520,13 @@ func (auth *Authentication) AuthMetricStoreAPI( if user != nil { switch { case len(user.Roles) == 1: - if user.HasRole(schema.RoleApi) { + if user.HasRole(schema.RoleAPI) { ctx := context.WithValue(r.Context(), repository.ContextUserKey, user) onsuccess.ServeHTTP(rw, r.WithContext(ctx)) return } case len(user.Roles) >= 2: - if user.HasRole(schema.RoleApi) && user.HasAnyRole([]schema.Role{schema.RoleUser, schema.RoleManager, schema.RoleAdmin}) { + if user.HasRole(schema.RoleAPI) && user.HasAnyRole([]schema.Role{schema.RoleUser, schema.RoleManager, schema.RoleAdmin}) { ctx := context.WithValue(r.Context(), repository.ContextUserKey, user) onsuccess.ServeHTTP(rw, r.WithContext(ctx)) return diff --git a/internal/config/config.go b/internal/config/config.go index 4e6fe975..2e601ed7 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -71,6 +71,23 @@ type ProgramConfig struct { // If exists, will enable dynamic zoom in frontend metric plots using the configured values EnableResampling *ResampleConfig `json:"resampling"` + + // Node state retention configuration + NodeStateRetention *NodeStateRetention `json:"nodestate-retention"` +} + +type NodeStateRetention struct { + Policy string `json:"policy"` // "delete" or "parquet" + Age int `json:"age"` // hours, default 24 + TargetKind string `json:"target-kind"` // "file" or "s3" + TargetPath string `json:"target-path"` + TargetEndpoint string `json:"target-endpoint"` + TargetBucket string `json:"target-bucket"` + TargetAccessKey string `json:"target-access-key"` + TargetSecretKey string `json:"target-secret-key"` + TargetRegion string `json:"target-region"` + TargetUsePathStyle bool `json:"target-use-path-style"` + MaxFileSizeMB int `json:"max-file-size-mb"` } type ResampleConfig struct { diff --git a/internal/config/schema.go b/internal/config/schema.go index 0d575b3c..bd1b314e 100644 --- a/internal/config/schema.go +++ b/internal/config/schema.go @@ -130,6 +130,59 @@ var configSchema = ` } }, "required": ["subject-job-event", "subject-node-state"] + }, + "nodestate-retention": { + "description": "Node state retention configuration for cleaning up old node_state rows.", + "type": "object", + "properties": { + "policy": { + "description": "Retention policy: 'delete' to remove old rows, 'parquet' to archive then delete.", + "type": "string", + "enum": ["delete", "parquet"] + }, + "age": { + "description": "Retention age in hours (default: 24).", + "type": "integer" + }, + "target-kind": { + "description": "Target kind for parquet archiving: 'file' or 's3'.", + "type": "string", + "enum": ["file", "s3"] + }, + "target-path": { + "description": "Filesystem path for parquet file target.", + "type": "string" + }, + "target-endpoint": { + "description": "S3 endpoint URL.", + "type": "string" + }, + "target-bucket": { + "description": "S3 bucket name.", + "type": "string" + }, + "target-access-key": { + "description": "S3 access key.", + "type": "string" + }, + "target-secret-key": { + "description": "S3 secret key.", + "type": "string" + }, + "target-region": { + "description": "S3 region.", + "type": "string" + }, + "target-use-path-style": { + "description": "Use path-style S3 addressing.", + "type": "boolean" + }, + "max-file-size-mb": { + "description": "Maximum parquet file size in MB (default: 128).", + "type": "integer" + } + }, + "required": ["policy"] } } }` diff --git a/internal/graph/generated/generated.go b/internal/graph/generated/generated.go index e1e5ea71..965fd860 100644 --- a/internal/graph/generated/generated.go +++ b/internal/graph/generated/generated.go @@ -10245,7 +10245,7 @@ func (ec *executionContext) _Series_id(ctx context.Context, field graphql.Collec field, ec.fieldContext_Series_id, func(ctx context.Context) (any, error) { - return obj.Id, nil + return obj.ID, nil }, nil, ec.marshalOString2ᚖstring, diff --git a/internal/graph/schema.resolvers.go b/internal/graph/schema.resolvers.go index 19d04eab..059bd16d 100644 --- a/internal/graph/schema.resolvers.go +++ b/internal/graph/schema.resolvers.go @@ -552,7 +552,7 @@ func (r *queryResolver) ScopedJobStats(ctx context.Context, id string, metrics [ for _, stat := range stats { mdlStats = append(mdlStats, &model.ScopedStats{ Hostname: stat.Hostname, - ID: stat.Id, + ID: stat.ID, Data: stat.Data, }) } @@ -824,6 +824,7 @@ func (r *queryResolver) NodeMetricsList(ctx context.Context, cluster string, sub } nodeRepo := repository.GetNodeRepository() + // nodes -> array hostname nodes, stateMap, countNodes, hasNextPage, nerr := nodeRepo.GetNodesForList(ctx, cluster, subCluster, stateFilter, nodeFilter, page) if nerr != nil { return nil, errors.New("could not retrieve node list required for resolving NodeMetricsList") @@ -835,6 +836,7 @@ func (r *queryResolver) NodeMetricsList(ctx context.Context, cluster string, sub } } + // data -> map hostname:jobdata data, err := metricdispatch.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, *resolution, from, to, ctx) if err != nil { cclog.Warn("error while loading node data (Resolver.NodeMetricsList") @@ -842,18 +844,18 @@ func (r *queryResolver) NodeMetricsList(ctx context.Context, cluster string, sub } nodeMetricsList := make([]*model.NodeMetrics, 0, len(data)) - for hostname, metrics := range data { + for _, hostname := range nodes { host := &model.NodeMetrics{ Host: hostname, State: stateMap[hostname], - Metrics: make([]*model.JobMetricWithName, 0, len(metrics)*len(scopes)), + Metrics: make([]*model.JobMetricWithName, 0), } host.SubCluster, err = archive.GetSubClusterByNode(cluster, hostname) if err != nil { cclog.Warnf("error in nodeMetrics resolver: %s", err) } - for metric, scopedMetrics := range metrics { + for metric, scopedMetrics := range data[hostname] { for scope, scopedMetric := range scopedMetrics { host.Metrics = append(host.Metrics, &model.JobMetricWithName{ Name: metric, @@ -867,7 +869,8 @@ func (r *queryResolver) NodeMetricsList(ctx context.Context, cluster string, sub } nodeMetricsListResult := &model.NodesResultList{ - Items: nodeMetricsList, + Items: nodeMetricsList, + // TotalNodes depends on sum of nodes grouped on latest timestamp, see repo/node.go:357 TotalNodes: &countNodes, HasNextPage: &hasNextPage, } diff --git a/internal/metricdispatch/dataLoader.go b/internal/metricdispatch/dataLoader.go index 78808a74..c420fee4 100644 --- a/internal/metricdispatch/dataLoader.go +++ b/internal/metricdispatch/dataLoader.go @@ -499,7 +499,7 @@ func copyJobMetric(src *schema.JobMetric) *schema.JobMetric { func copySeries(src *schema.Series) schema.Series { dst := schema.Series{ Hostname: src.Hostname, - Id: src.Id, + ID: src.ID, Statistics: src.Statistics, Data: make([]schema.Float, len(src.Data)), } diff --git a/internal/metricdispatch/dataLoader_test.go b/internal/metricdispatch/dataLoader_test.go index c4841f8d..65a366f9 100644 --- a/internal/metricdispatch/dataLoader_test.go +++ b/internal/metricdispatch/dataLoader_test.go @@ -21,7 +21,7 @@ func TestDeepCopy(t *testing.T) { Series: []schema.Series{ { Hostname: "node001", - Id: &nodeId, + ID: &nodeId, Data: []schema.Float{1.0, 2.0, 3.0}, Statistics: schema.MetricStatistics{ Min: 1.0, diff --git a/internal/metricstoreclient/cc-metric-store.go b/internal/metricstoreclient/cc-metric-store.go index aadbe1b1..4472b825 100644 --- a/internal/metricstoreclient/cc-metric-store.go +++ b/internal/metricstoreclient/cc-metric-store.go @@ -267,7 +267,7 @@ func (ccms *CCMetricStore) LoadData( jobMetric.Series = append(jobMetric.Series, schema.Series{ Hostname: query.Hostname, - Id: id, + ID: id, Statistics: schema.MetricStatistics{ Avg: float64(res.Avg), Min: float64(res.Min), @@ -419,7 +419,7 @@ func (ccms *CCMetricStore) LoadScopedStats( scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{ Hostname: query.Hostname, - Id: id, + ID: id, Data: &schema.MetricStatistics{ Avg: float64(res.Avg), Min: float64(res.Min), @@ -634,7 +634,7 @@ func (ccms *CCMetricStore) LoadNodeListData( scopeData.Series = append(scopeData.Series, schema.Series{ Hostname: query.Hostname, - Id: id, + ID: id, Statistics: schema.MetricStatistics{ Avg: float64(res.Avg), Min: float64(res.Min), diff --git a/internal/repository/jobQuery.go b/internal/repository/jobQuery.go index 658413e8..81779583 100644 --- a/internal/repository/jobQuery.go +++ b/internal/repository/jobQuery.go @@ -150,7 +150,7 @@ func SecurityCheckWithUser(user *schema.User, query sq.SelectBuilder) (sq.Select } switch { - case len(user.Roles) == 1 && user.HasRole(schema.RoleApi): + case len(user.Roles) == 1 && user.HasRole(schema.RoleAPI): return query, nil case user.HasAnyRole([]schema.Role{schema.RoleAdmin, schema.RoleSupport}): return query, nil diff --git a/internal/repository/migrations/sqlite3/10_node-table.up.sql b/internal/repository/migrations/sqlite3/10_node-table.up.sql index 7b5b5ac7..fd118f5d 100644 --- a/internal/repository/migrations/sqlite3/10_node-table.up.sql +++ b/internal/repository/migrations/sqlite3/10_node-table.up.sql @@ -23,6 +23,7 @@ CREATE TABLE "node_state" ( CHECK (health_state IN ( 'full', 'partial', 'failed' )), + health_metrics TEXT, -- JSON array of strings node_id INTEGER, FOREIGN KEY (node_id) REFERENCES node (id) ); diff --git a/internal/repository/node.go b/internal/repository/node.go index df3aec8b..08a694c6 100644 --- a/internal/repository/node.go +++ b/internal/repository/node.go @@ -169,9 +169,10 @@ func (r *NodeRepository) AddNode(node *schema.NodeDB) (int64, error) { } const NamedNodeStateInsert string = ` -INSERT INTO node_state (time_stamp, node_state, health_state, cpus_allocated, - memory_allocated, gpus_allocated, jobs_running, node_id) - VALUES (:time_stamp, :node_state, :health_state, :cpus_allocated, :memory_allocated, :gpus_allocated, :jobs_running, :node_id);` +INSERT INTO node_state (time_stamp, node_state, health_state, health_metrics, + cpus_allocated, memory_allocated, gpus_allocated, jobs_running, node_id) + VALUES (:time_stamp, :node_state, :health_state, :health_metrics, + :cpus_allocated, :memory_allocated, :gpus_allocated, :jobs_running, :node_id);` // TODO: Add real Monitoring Health State @@ -224,6 +225,75 @@ func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeSt // return nil // } +// NodeStateWithNode combines a node state row with denormalized node info. +type NodeStateWithNode struct { + ID int64 `db:"id"` + TimeStamp int64 `db:"time_stamp"` + NodeState string `db:"node_state"` + HealthState string `db:"health_state"` + HealthMetrics string `db:"health_metrics"` + CpusAllocated int `db:"cpus_allocated"` + MemoryAllocated int64 `db:"memory_allocated"` + GpusAllocated int `db:"gpus_allocated"` + JobsRunning int `db:"jobs_running"` + Hostname string `db:"hostname"` + Cluster string `db:"cluster"` + SubCluster string `db:"subcluster"` +} + +// FindNodeStatesBefore returns all node_state rows with time_stamp < cutoff, +// joined with node info for denormalized archiving. +func (r *NodeRepository) FindNodeStatesBefore(cutoff int64) ([]NodeStateWithNode, error) { + rows, err := sq.Select( + "node_state.id", "node_state.time_stamp", "node_state.node_state", + "node_state.health_state", "node_state.health_metrics", + "node_state.cpus_allocated", "node_state.memory_allocated", + "node_state.gpus_allocated", "node_state.jobs_running", + "node.hostname", "node.cluster", "node.subcluster", + ). + From("node_state"). + Join("node ON node_state.node_id = node.id"). + Where(sq.Lt{"node_state.time_stamp": cutoff}). + Where("node_state.id NOT IN (SELECT ns2.id FROM node_state ns2 WHERE ns2.time_stamp = (SELECT MAX(ns3.time_stamp) FROM node_state ns3 WHERE ns3.node_id = ns2.node_id))"). + OrderBy("node_state.time_stamp ASC"). + RunWith(r.DB).Query() + if err != nil { + return nil, err + } + defer rows.Close() + + var result []NodeStateWithNode + for rows.Next() { + var ns NodeStateWithNode + if err := rows.Scan(&ns.ID, &ns.TimeStamp, &ns.NodeState, + &ns.HealthState, &ns.HealthMetrics, + &ns.CpusAllocated, &ns.MemoryAllocated, + &ns.GpusAllocated, &ns.JobsRunning, + &ns.Hostname, &ns.Cluster, &ns.SubCluster); err != nil { + return nil, err + } + result = append(result, ns) + } + return result, nil +} + +// DeleteNodeStatesBefore removes node_state rows with time_stamp < cutoff, +// but always preserves the row with the latest timestamp per node_id. +func (r *NodeRepository) DeleteNodeStatesBefore(cutoff int64) (int64, error) { + res, err := r.DB.Exec( + `DELETE FROM node_state WHERE time_stamp < ? + AND id NOT IN ( + SELECT id FROM node_state ns2 + WHERE ns2.time_stamp = (SELECT MAX(ns3.time_stamp) FROM node_state ns3 WHERE ns3.node_id = ns2.node_id) + )`, + cutoff, + ) + if err != nil { + return 0, err + } + return res.RowsAffected() +} + func (r *NodeRepository) DeleteNode(id int64) error { _, err := r.DB.Exec(`DELETE FROM node WHERE node.id = ?`, id) if err != nil { @@ -263,14 +333,16 @@ func (r *NodeRepository) QueryNodes( if f.SchedulerState != nil { query = query.Where("node_state = ?", f.SchedulerState) // Requires Additional time_stamp Filter: Else the last (past!) time_stamp with queried state will be returned + // TODO: Hardcoded TimeDiff Suboptimal - Use Config Option? now := time.Now().Unix() - query = query.Where(sq.Gt{"time_stamp": (now - 60)}) + query = query.Where(sq.Gt{"time_stamp": (now - 300)}) } if f.HealthState != nil { query = query.Where("health_state = ?", f.HealthState) // Requires Additional time_stamp Filter: Else the last (past!) time_stamp with queried state will be returned + // TODO: Hardcoded TimeDiff Suboptimal - Use Config Option? now := time.Now().Unix() - query = query.Where(sq.Gt{"time_stamp": (now - 60)}) + query = query.Where(sq.Gt{"time_stamp": (now - 300)}) } } @@ -331,14 +403,16 @@ func (r *NodeRepository) CountNodes( if f.SchedulerState != nil { query = query.Where("node_state = ?", f.SchedulerState) // Requires Additional time_stamp Filter: Else the last (past!) time_stamp with queried state will be returned + // TODO: Hardcoded TimeDiff Suboptimal - Use Config Option? now := time.Now().Unix() - query = query.Where(sq.Gt{"time_stamp": (now - 60)}) + query = query.Where(sq.Gt{"time_stamp": (now - 300)}) } if f.HealthState != nil { query = query.Where("health_state = ?", f.HealthState) // Requires Additional time_stamp Filter: Else the last (past!) time_stamp with queried state will be returned + // TODO: Hardcoded TimeDiff Suboptimal - Use Config Option? now := time.Now().Unix() - query = query.Where(sq.Gt{"time_stamp": (now - 60)}) + query = query.Where(sq.Gt{"time_stamp": (now - 300)}) } } diff --git a/internal/repository/node_test.go b/internal/repository/node_test.go index 4286ab34..d1e86b9a 100644 --- a/internal/repository/node_test.go +++ b/internal/repository/node_test.go @@ -156,8 +156,12 @@ func nodeTestSetup(t *testing.T) { func TestUpdateNodeState(t *testing.T) { nodeTestSetup(t) + repo := GetNodeRepository() + now := time.Now().Unix() + nodeState := schema.NodeStateDB{ - TimeStamp: time.Now().Unix(), NodeState: "allocated", + TimeStamp: now, + NodeState: "allocated", CpusAllocated: 72, MemoryAllocated: 480, GpusAllocated: 0, @@ -165,18 +169,152 @@ func TestUpdateNodeState(t *testing.T) { JobsRunning: 1, } - repo := GetNodeRepository() err := repo.UpdateNodeState("host124", "testcluster", &nodeState) if err != nil { - return + t.Fatal(err) } node, err := repo.GetNode("host124", "testcluster", false) if err != nil { - return + t.Fatal(err) } if node.NodeState != "allocated" { t.Errorf("wrong node state\ngot: %s \nwant: allocated ", node.NodeState) } + + t.Run("FindBeforeEmpty", func(t *testing.T) { + // Only the current-timestamp row exists, so nothing should be found before now + rows, err := repo.FindNodeStatesBefore(now) + if err != nil { + t.Fatal(err) + } + if len(rows) != 0 { + t.Errorf("expected 0 rows, got %d", len(rows)) + } + }) + + t.Run("DeleteOldRows", func(t *testing.T) { + // Insert 2 more old rows for host124 + for i, ts := range []int64{now - 7200, now - 3600} { + ns := schema.NodeStateDB{ + TimeStamp: ts, + NodeState: "allocated", + HealthState: schema.MonitoringStateFull, + CpusAllocated: 72, + MemoryAllocated: 480, + JobsRunning: i, + } + if err := repo.UpdateNodeState("host124", "testcluster", &ns); err != nil { + t.Fatal(err) + } + } + + // Delete rows older than 30 minutes + cutoff := now - 1800 + cnt, err := repo.DeleteNodeStatesBefore(cutoff) + if err != nil { + t.Fatal(err) + } + + // Should delete the 2 old rows + if cnt != 2 { + t.Errorf("expected 2 deleted rows, got %d", cnt) + } + + // Latest row should still exist + node, err := repo.GetNode("host124", "testcluster", false) + if err != nil { + t.Fatal(err) + } + if node.NodeState != "allocated" { + t.Errorf("expected node state 'allocated', got %s", node.NodeState) + } + }) + + t.Run("PreservesLatestPerNode", func(t *testing.T) { + // Insert a single old row for host125 — it's the latest per node so it must survive + ns := schema.NodeStateDB{ + TimeStamp: now - 7200, + NodeState: "idle", + HealthState: schema.MonitoringStateFull, + CpusAllocated: 0, + MemoryAllocated: 0, + JobsRunning: 0, + } + if err := repo.UpdateNodeState("host125", "testcluster", &ns); err != nil { + t.Fatal(err) + } + + // Delete everything older than now — the latest per node should be preserved + _, err := repo.DeleteNodeStatesBefore(now) + if err != nil { + t.Fatal(err) + } + + // The latest row for host125 must still exist + node, err := repo.GetNode("host125", "testcluster", false) + if err != nil { + t.Fatal(err) + } + if node.NodeState != "idle" { + t.Errorf("expected node state 'idle', got %s", node.NodeState) + } + + // Verify exactly 1 row remains for host125 + var countAfter int + if err := repo.DB.QueryRow( + "SELECT COUNT(*) FROM node_state WHERE node_id = (SELECT id FROM node WHERE hostname = 'host125')"). + Scan(&countAfter); err != nil { + t.Fatal(err) + } + if countAfter != 1 { + t.Errorf("expected 1 row remaining for host125, got %d", countAfter) + } + }) + + t.Run("FindBeforeWithJoin", func(t *testing.T) { + // Insert old and current rows for host123 + for _, ts := range []int64{now - 7200, now} { + ns := schema.NodeStateDB{ + TimeStamp: ts, + NodeState: "allocated", + HealthState: schema.MonitoringStateFull, + CpusAllocated: 8, + MemoryAllocated: 1024, + GpusAllocated: 1, + JobsRunning: 1, + } + if err := repo.UpdateNodeState("host123", "testcluster", &ns); err != nil { + t.Fatal(err) + } + } + + // Find rows older than 30 minutes, excluding latest per node + cutoff := now - 1800 + rows, err := repo.FindNodeStatesBefore(cutoff) + if err != nil { + t.Fatal(err) + } + + // Should find the old host123 row + found := false + for _, row := range rows { + if row.Hostname == "host123" && row.TimeStamp == now-7200 { + found = true + if row.Cluster != "testcluster" { + t.Errorf("expected cluster 'testcluster', got %s", row.Cluster) + } + if row.SubCluster != "sc1" { + t.Errorf("expected subcluster 'sc1', got %s", row.SubCluster) + } + if row.CpusAllocated != 8 { + t.Errorf("expected cpus_allocated 8, got %d", row.CpusAllocated) + } + } + } + if !found { + t.Errorf("expected to find old host123 row among %d results", len(rows)) + } + }) } diff --git a/internal/repository/tags.go b/internal/repository/tags.go index 612666da..943dda66 100644 --- a/internal/repository/tags.go +++ b/internal/repository/tags.go @@ -644,12 +644,12 @@ func (r *JobRepository) checkScopeAuth(user *schema.User, operation string, scop if user != nil { switch { case operation == "write" && scope == "admin": - if user.HasRole(schema.RoleAdmin) || (len(user.Roles) == 1 && user.HasRole(schema.RoleApi)) { + if user.HasRole(schema.RoleAdmin) || (len(user.Roles) == 1 && user.HasRole(schema.RoleAPI)) { return true, nil } return false, nil case operation == "write" && scope == "global": - if user.HasAnyRole([]schema.Role{schema.RoleAdmin, schema.RoleSupport}) || (len(user.Roles) == 1 && user.HasRole(schema.RoleApi)) { + if user.HasAnyRole([]schema.Role{schema.RoleAdmin, schema.RoleSupport}) || (len(user.Roles) == 1 && user.HasRole(schema.RoleAPI)) { return true, nil } return false, nil diff --git a/internal/taskmanager/nodestateRetentionService.go b/internal/taskmanager/nodestateRetentionService.go new file mode 100644 index 00000000..9a704502 --- /dev/null +++ b/internal/taskmanager/nodestateRetentionService.go @@ -0,0 +1,120 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package taskmanager + +import ( + "time" + + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/repository" + pqarchive "github.com/ClusterCockpit/cc-backend/pkg/archive/parquet" + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/go-co-op/gocron/v2" +) + +func RegisterNodeStateRetentionDeleteService(ageHours int) { + cclog.Info("Register node state retention delete service") + + s.NewJob(gocron.DurationJob(1*time.Hour), + gocron.NewTask( + func() { + cutoff := time.Now().Unix() - int64(ageHours*3600) + nodeRepo := repository.GetNodeRepository() + cnt, err := nodeRepo.DeleteNodeStatesBefore(cutoff) + if err != nil { + cclog.Errorf("NodeState retention: error deleting old rows: %v", err) + } else if cnt > 0 { + cclog.Infof("NodeState retention: deleted %d old rows", cnt) + } + })) +} + +func RegisterNodeStateRetentionParquetService(cfg *config.NodeStateRetention) { + cclog.Info("Register node state retention parquet service") + + maxFileSizeMB := cfg.MaxFileSizeMB + if maxFileSizeMB <= 0 { + maxFileSizeMB = 128 + } + + ageHours := cfg.Age + if ageHours <= 0 { + ageHours = 24 + } + + var target pqarchive.ParquetTarget + var err error + + switch cfg.TargetKind { + case "s3": + target, err = pqarchive.NewS3Target(pqarchive.S3TargetConfig{ + Endpoint: cfg.TargetEndpoint, + Bucket: cfg.TargetBucket, + AccessKey: cfg.TargetAccessKey, + SecretKey: cfg.TargetSecretKey, + Region: cfg.TargetRegion, + UsePathStyle: cfg.TargetUsePathStyle, + }) + default: + target, err = pqarchive.NewFileTarget(cfg.TargetPath) + } + + if err != nil { + cclog.Errorf("NodeState parquet retention: failed to create target: %v", err) + return + } + + s.NewJob(gocron.DurationJob(1*time.Hour), + gocron.NewTask( + func() { + cutoff := time.Now().Unix() - int64(ageHours*3600) + nodeRepo := repository.GetNodeRepository() + + rows, err := nodeRepo.FindNodeStatesBefore(cutoff) + if err != nil { + cclog.Errorf("NodeState parquet retention: error finding rows: %v", err) + return + } + if len(rows) == 0 { + return + } + + cclog.Infof("NodeState parquet retention: archiving %d rows", len(rows)) + pw := pqarchive.NewNodeStateParquetWriter(target, maxFileSizeMB) + + for _, ns := range rows { + row := pqarchive.ParquetNodeStateRow{ + TimeStamp: ns.TimeStamp, + NodeState: ns.NodeState, + HealthState: ns.HealthState, + HealthMetrics: ns.HealthMetrics, + CpusAllocated: int32(ns.CpusAllocated), + MemoryAllocated: ns.MemoryAllocated, + GpusAllocated: int32(ns.GpusAllocated), + JobsRunning: int32(ns.JobsRunning), + Hostname: ns.Hostname, + Cluster: ns.Cluster, + SubCluster: ns.SubCluster, + } + if err := pw.AddRow(row); err != nil { + cclog.Errorf("NodeState parquet retention: add row: %v", err) + continue + } + } + + if err := pw.Close(); err != nil { + cclog.Errorf("NodeState parquet retention: close writer: %v", err) + return + } + + cnt, err := nodeRepo.DeleteNodeStatesBefore(cutoff) + if err != nil { + cclog.Errorf("NodeState parquet retention: error deleting rows: %v", err) + } else { + cclog.Infof("NodeState parquet retention: deleted %d rows from db", cnt) + } + })) +} diff --git a/internal/taskmanager/taskManager.go b/internal/taskmanager/taskManager.go index e323557b..8cf6b4e6 100644 --- a/internal/taskmanager/taskManager.go +++ b/internal/taskmanager/taskManager.go @@ -144,9 +144,30 @@ func Start(cronCfg, archiveConfig json.RawMessage) { RegisterUpdateDurationWorker() RegisterCommitJobService() + if config.Keys.NodeStateRetention != nil && config.Keys.NodeStateRetention.Policy != "" { + initNodeStateRetention() + } + s.Start() } +func initNodeStateRetention() { + cfg := config.Keys.NodeStateRetention + age := cfg.Age + if age <= 0 { + age = 24 + } + + switch cfg.Policy { + case "delete": + RegisterNodeStateRetentionDeleteService(age) + case "parquet": + RegisterNodeStateRetentionParquetService(cfg) + default: + cclog.Warnf("Unknown nodestate-retention policy: %s", cfg.Policy) + } +} + // Shutdown stops the task manager and its scheduler. func Shutdown() { if s != nil { diff --git a/pkg/archive/json.go b/pkg/archive/json.go index cf1b0a38..dd37075d 100644 --- a/pkg/archive/json.go +++ b/pkg/archive/json.go @@ -51,7 +51,7 @@ func DecodeJobStats(r io.Reader, k string) (schema.ScopedJobStats, error) { for _, series := range jobMetric.Series { scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{ Hostname: series.Hostname, - Id: series.Id, + ID: series.ID, Data: &series.Statistics, }) } diff --git a/pkg/archive/parquet/convert.go b/pkg/archive/parquet/convert.go index ceaa3f2f..ba1e76eb 100644 --- a/pkg/archive/parquet/convert.go +++ b/pkg/archive/parquet/convert.go @@ -81,7 +81,6 @@ func JobToParquetRow(meta *schema.Job, data *schema.JobData) (*ParquetJobRow, er NumNodes: meta.NumNodes, NumHWThreads: meta.NumHWThreads, NumAcc: meta.NumAcc, - Exclusive: meta.Exclusive, Energy: meta.Energy, SMT: meta.SMT, ResourcesJSON: resourcesJSON, diff --git a/pkg/archive/parquet/nodestate_schema.go b/pkg/archive/parquet/nodestate_schema.go new file mode 100644 index 00000000..c9dfe363 --- /dev/null +++ b/pkg/archive/parquet/nodestate_schema.go @@ -0,0 +1,20 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package parquet + +type ParquetNodeStateRow struct { + TimeStamp int64 `parquet:"time_stamp"` + NodeState string `parquet:"node_state"` + HealthState string `parquet:"health_state"` + HealthMetrics string `parquet:"health_metrics,optional"` + CpusAllocated int32 `parquet:"cpus_allocated"` + MemoryAllocated int64 `parquet:"memory_allocated"` + GpusAllocated int32 `parquet:"gpus_allocated"` + JobsRunning int32 `parquet:"jobs_running"` + Hostname string `parquet:"hostname"` + Cluster string `parquet:"cluster"` + SubCluster string `parquet:"subcluster"` +} diff --git a/pkg/archive/parquet/nodestate_writer.go b/pkg/archive/parquet/nodestate_writer.go new file mode 100644 index 00000000..053417d6 --- /dev/null +++ b/pkg/archive/parquet/nodestate_writer.go @@ -0,0 +1,104 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package parquet + +import ( + "bytes" + "fmt" + "time" + + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + pq "github.com/parquet-go/parquet-go" +) + +// NodeStateParquetWriter batches ParquetNodeStateRows and flushes them to a target +// when the estimated size exceeds maxSizeBytes. +type NodeStateParquetWriter struct { + target ParquetTarget + maxSizeBytes int64 + rows []ParquetNodeStateRow + currentSize int64 + fileCounter int + datePrefix string +} + +// NewNodeStateParquetWriter creates a new writer for node state parquet files. +func NewNodeStateParquetWriter(target ParquetTarget, maxSizeMB int) *NodeStateParquetWriter { + return &NodeStateParquetWriter{ + target: target, + maxSizeBytes: int64(maxSizeMB) * 1024 * 1024, + datePrefix: time.Now().Format("2006-01-02"), + } +} + +// AddRow adds a row to the current batch. If the estimated batch size +// exceeds the configured maximum, the batch is flushed first. +func (pw *NodeStateParquetWriter) AddRow(row ParquetNodeStateRow) error { + rowSize := estimateNodeStateRowSize(&row) + + if pw.currentSize+rowSize > pw.maxSizeBytes && len(pw.rows) > 0 { + if err := pw.Flush(); err != nil { + return err + } + } + + pw.rows = append(pw.rows, row) + pw.currentSize += rowSize + return nil +} + +// Flush writes the current batch to a parquet file on the target. +func (pw *NodeStateParquetWriter) Flush() error { + if len(pw.rows) == 0 { + return nil + } + + pw.fileCounter++ + fileName := fmt.Sprintf("cc-nodestate-%s-%03d.parquet", pw.datePrefix, pw.fileCounter) + + data, err := writeNodeStateParquetBytes(pw.rows) + if err != nil { + return fmt.Errorf("write parquet buffer: %w", err) + } + + if err := pw.target.WriteFile(fileName, data); err != nil { + return fmt.Errorf("write parquet file %q: %w", fileName, err) + } + + cclog.Infof("NodeState retention: wrote %s (%d rows, %d bytes)", fileName, len(pw.rows), len(data)) + pw.rows = pw.rows[:0] + pw.currentSize = 0 + return nil +} + +// Close flushes any remaining rows and finalizes the writer. +func (pw *NodeStateParquetWriter) Close() error { + return pw.Flush() +} + +func writeNodeStateParquetBytes(rows []ParquetNodeStateRow) ([]byte, error) { + var buf bytes.Buffer + + writer := pq.NewGenericWriter[ParquetNodeStateRow](&buf, + pq.Compression(&pq.Snappy), + ) + + if _, err := writer.Write(rows); err != nil { + return nil, err + } + if err := writer.Close(); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +func estimateNodeStateRowSize(row *ParquetNodeStateRow) int64 { + size := int64(100) // fixed numeric fields + size += int64(len(row.NodeState) + len(row.HealthState) + len(row.HealthMetrics)) + size += int64(len(row.Hostname) + len(row.Cluster) + len(row.SubCluster)) + return size +} diff --git a/pkg/archive/parquet/writer_test.go b/pkg/archive/parquet/writer_test.go index 6baaa527..e532e472 100644 --- a/pkg/archive/parquet/writer_test.go +++ b/pkg/archive/parquet/writer_test.go @@ -47,7 +47,6 @@ func makeTestJob(jobID int64) (*schema.Job, *schema.JobData) { Walltime: 7200, NumNodes: 2, NumHWThreads: 16, - Exclusive: 1, SMT: 1, Resources: []*schema.Resource{ {Hostname: "node001"}, diff --git a/pkg/metricstore/healthcheck.go b/pkg/metricstore/healthcheck.go index ed1ff38e..d6def692 100644 --- a/pkg/metricstore/healthcheck.go +++ b/pkg/metricstore/healthcheck.go @@ -6,6 +6,7 @@ package metricstore import ( + "encoding/json" "fmt" "time" @@ -19,6 +20,13 @@ type HealthCheckResponse struct { Error error } +// HealthCheckResult holds the monitoring state and raw JSON health metrics +// for a single node as determined by HealthCheck. +type HealthCheckResult struct { + State schema.MonitoringState + HealthMetrics string // JSON: {"missing":[...],"degraded":[...]} +} + // MaxMissingDataPoints is the threshold for stale data detection. // A buffer is considered healthy if the gap between its last data point // and the current time is within MaxMissingDataPoints * frequency. @@ -134,15 +142,15 @@ func (m *MemoryStore) GetHealthyMetrics(selector []string, expectedMetrics []str // - MonitoringStateFailed: node not found, or no healthy metrics at all func (m *MemoryStore) HealthCheck(cluster string, nodes []string, expectedMetrics []string, -) (map[string]schema.MonitoringState, error) { - results := make(map[string]schema.MonitoringState, len(nodes)) +) (map[string]HealthCheckResult, error) { + results := make(map[string]HealthCheckResult, len(nodes)) for _, hostname := range nodes { selector := []string{cluster, hostname} degradedList, missingList, err := m.GetHealthyMetrics(selector, expectedMetrics) if err != nil { - results[hostname] = schema.MonitoringStateFailed + results[hostname] = HealthCheckResult{State: schema.MonitoringStateFailed} continue } @@ -158,13 +166,24 @@ func (m *MemoryStore) HealthCheck(cluster string, cclog.ComponentInfo("metricstore", "HealthCheck: node ", hostname, "missing metrics:", missingList) } + var state schema.MonitoringState switch { case degradedCount == 0 && missingCount == 0: - results[hostname] = schema.MonitoringStateFull + state = schema.MonitoringStateFull case healthyCount == 0: - results[hostname] = schema.MonitoringStateFailed + state = schema.MonitoringStateFailed default: - results[hostname] = schema.MonitoringStatePartial + state = schema.MonitoringStatePartial + } + + hm, _ := json.Marshal(map[string][]string{ + "missing": missingList, + "degraded": degradedList, + }) + + results[hostname] = HealthCheckResult{ + State: state, + HealthMetrics: string(hm), } } diff --git a/pkg/metricstore/metricstore_test.go b/pkg/metricstore/metricstore_test.go index 4d68d76c..a9ff0055 100644 --- a/pkg/metricstore/metricstore_test.go +++ b/pkg/metricstore/metricstore_test.go @@ -253,8 +253,8 @@ func TestHealthCheck(t *testing.T) { // Check status if wantStatus, ok := tt.wantStates[node]; ok { - if state != wantStatus { - t.Errorf("HealthCheck() node %s status = %v, want %v", node, state, wantStatus) + if state.State != wantStatus { + t.Errorf("HealthCheck() node %s status = %v, want %v", node, state.State, wantStatus) } } } diff --git a/pkg/metricstore/query.go b/pkg/metricstore/query.go index e5a49af3..709a9710 100644 --- a/pkg/metricstore/query.go +++ b/pkg/metricstore/query.go @@ -149,7 +149,7 @@ func (ccms *InternalMetricStore) LoadData( jobMetric.Series = append(jobMetric.Series, schema.Series{ Hostname: query.Hostname, - Id: id, + ID: id, Statistics: schema.MetricStatistics{ Avg: float64(res.Avg), Min: float64(res.Min), @@ -651,7 +651,7 @@ func (ccms *InternalMetricStore) LoadScopedStats( scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{ Hostname: query.Hostname, - Id: id, + ID: id, Data: &schema.MetricStatistics{ Avg: float64(res.Avg), Min: float64(res.Min), @@ -894,7 +894,7 @@ func (ccms *InternalMetricStore) LoadNodeListData( scopeData.Series = append(scopeData.Series, schema.Series{ Hostname: query.Hostname, - Id: id, + ID: id, Statistics: schema.MetricStatistics{ Avg: float64(res.Avg), Min: float64(res.Min), diff --git a/web/frontend/src/Job.root.svelte b/web/frontend/src/Job.root.svelte index 99dfa7ac..3baed1c1 100644 --- a/web/frontend/src/Job.root.svelte +++ b/web/frontend/src/Job.root.svelte @@ -30,7 +30,7 @@ import { init, groupByScope, - checkMetricDisabled, + checkMetricAvailability, } from "./generic/utils.js"; import Metric from "./job/Metric.svelte"; import MetricSelection from "./generic/select/MetricSelection.svelte"; @@ -151,17 +151,17 @@ } return names; }, []); - + // return metricNames.filter( (metric) => !metrics.some((jm) => jm.name == metric) && selectedMetrics.includes(metric) && - !checkMetricDisabled( + (checkMetricAvailability( globalMetrics, metric, thisJob.cluster, thisJob.subCluster, - ), + ) == "configured") ); } else { return [] @@ -212,7 +212,7 @@ inputMetrics.map((metric) => ({ metric: metric, data: grouped.find((group) => group[0].name == metric), - disabled: checkMetricDisabled( + availability: checkMetricAvailability( globalMetrics, metric, thisJob.cluster, @@ -333,7 +333,17 @@ {:else if thisJob && $jobMetrics?.data?.scopedJobStats} {#snippet gridContent(item)} - {#if item?.disabled} + {#if item.availability == "none"} + + + Metric not configured + + +

No datasets returned for {item.metric}.

+

Metric is not configured for cluster {thisJob.cluster}.

+
+
+ {:else if item.availability == "disabled"} Disabled Metric diff --git a/web/frontend/src/Jobs.root.svelte b/web/frontend/src/Jobs.root.svelte index 0d543fc8..a06aee3c 100644 --- a/web/frontend/src/Jobs.root.svelte +++ b/web/frontend/src/Jobs.root.svelte @@ -142,7 +142,8 @@ 0)} shortJobCutoff={ccconfig?.jobList_hideShortRunningJobs} showFilter={!showCompare} matchedJobs={showCompare? matchedCompareJobs: matchedListJobs} diff --git a/web/frontend/src/Node.root.svelte b/web/frontend/src/Node.root.svelte index d3364b49..06056466 100644 --- a/web/frontend/src/Node.root.svelte +++ b/web/frontend/src/Node.root.svelte @@ -32,7 +32,7 @@ } from "@urql/svelte"; import { init, - checkMetricDisabled, + checkMetricAvailability, } from "./generic/utils.js"; import PlotGrid from "./generic/PlotGrid.svelte"; import MetricPlot from "./generic/plots/MetricPlot.svelte"; @@ -242,17 +242,17 @@ {item.name} {systemUnits[item.name] ? "(" + systemUnits[item.name] + ")" : ""} - {#if item.disabled === false && item.metric} - c.name == cluster)} - subCluster={$nodeMetricsData.data.nodeMetrics[0].subCluster} - series={item.metric.series} - enableFlip - forNode - /> - {:else if item.disabled === true && item.metric} + {#if item.availability == "none"} + + + Metric not configured + + +

No datasets returned for {item.name}.

+

Metric is not configured for cluster {cluster}.

+
+
+ {:else if item.availability == "disabled"} Disabled Metric @@ -262,6 +262,16 @@

Metric has been disabled for subcluster {$nodeMetricsData.data.nodeMetrics[0].subCluster}.

+ {:else if item?.metric} + c.name == cluster)} + subCluster={$nodeMetricsData.data.nodeMetrics[0].subCluster} + series={item.metric.series} + enableFlip + forNode + /> {:else} @@ -279,7 +289,7 @@ items={$nodeMetricsData.data.nodeMetrics[0].metrics .map((m) => ({ ...m, - disabled: checkMetricDisabled( + availability: checkMetricAvailability( globalMetrics, m.name, cluster, diff --git a/web/frontend/src/Systems.root.svelte b/web/frontend/src/Systems.root.svelte index fb5c4495..d89b5f06 100644 --- a/web/frontend/src/Systems.root.svelte +++ b/web/frontend/src/Systems.root.svelte @@ -272,8 +272,8 @@ {:else} - + {/if} {/if} diff --git a/web/frontend/src/User.root.svelte b/web/frontend/src/User.root.svelte index 4ee3f892..d086df14 100644 --- a/web/frontend/src/User.root.svelte +++ b/web/frontend/src/User.root.svelte @@ -219,7 +219,8 @@ 0)} shortJobCutoff={ccconfig?.jobList_hideShortRunningJobs} showFilter={!showCompare} matchedJobs={showCompare? matchedCompareJobs: matchedListJobs} diff --git a/web/frontend/src/generic/Filters.svelte b/web/frontend/src/generic/Filters.svelte index 5a8bcf23..4031d0a5 100644 --- a/web/frontend/src/generic/Filters.svelte +++ b/web/frontend/src/generic/Filters.svelte @@ -451,7 +451,7 @@ {#if filters.startTime.range} (isStartTimeOpen = true)}> - {startTimeSelectOptions.find((stso) => stso.range === filters.startTime.range).rangeLabel } + Job Start: {startTimeSelectOptions.find((stso) => stso.range === filters.startTime.range).rangeLabel } {/if} diff --git a/web/frontend/src/generic/filters/StartTime.svelte b/web/frontend/src/generic/filters/StartTime.svelte index 2eceaf6e..1a298349 100644 --- a/web/frontend/src/generic/filters/StartTime.svelte +++ b/web/frontend/src/generic/filters/StartTime.svelte @@ -14,10 +14,10 @@ diff --git a/web/frontend/src/generic/joblist/JobListRow.svelte b/web/frontend/src/generic/joblist/JobListRow.svelte index 5d129ad0..9db340d4 100644 --- a/web/frontend/src/generic/joblist/JobListRow.svelte +++ b/web/frontend/src/generic/joblist/JobListRow.svelte @@ -19,7 +19,7 @@ @@ -168,19 +151,25 @@ {/if} {#each refinedData as metricData, i (metricData?.data?.name || i)} - - {#if metricData?.disabled} - -

No dataset(s) returned for {selectedMetrics[i]}

-

Metric has been disabled for subcluster {nodeData.subCluster}.

-
- {:else if !metricData?.data} - -

No dataset(s) returned for {selectedMetrics[i]}

-

Metric was not found in metric store for cluster {cluster}.

-
- {:else if !!metricData.data?.metric.statisticsSeries} - + {#key metricData} + + {#if metricData?.availability == "none"} + +

No dataset(s) returned for {selectedMetrics[i]}

+

Metric is not configured for cluster {cluster}.

+
+ {:else if metricData?.availability == "disabled"} + +

No dataset(s) returned for {selectedMetrics[i]}

+

Metric has been disabled for subcluster {nodeData.subCluster}.

+
+ {:else if !metricData?.data} + +

No dataset(s) returned for {selectedMetrics[i]}

+

Metric or host was not found in metric store for cluster {cluster}.

+
+ {:else if !!metricData.data?.metric.statisticsSeries} + -
- {#key extendedLegendData} +
- {/key} - {:else} + {:else} - {/if} - + {/if} + + {/key} {/each}