mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-03-16 04:47:30 +01:00
@@ -248,7 +248,7 @@ func generateJWT(authHandle *auth.Authentication, username string) error {
|
||||
return fmt.Errorf("getting user '%s': %w", username, err)
|
||||
}
|
||||
|
||||
if !user.HasRole(schema.RoleApi) {
|
||||
if !user.HasRole(schema.RoleAPI) {
|
||||
cclog.Warnf("JWT: User '%s' does not have the role 'api'. REST API endpoints will return error!\n", user.Username)
|
||||
}
|
||||
|
||||
|
||||
2
go.mod
2
go.mod
@@ -9,7 +9,7 @@ tool (
|
||||
|
||||
require (
|
||||
github.com/99designs/gqlgen v0.17.85
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.2.2
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.4.0
|
||||
github.com/Masterminds/squirrel v1.5.4
|
||||
github.com/aws/aws-sdk-go-v2 v1.41.1
|
||||
github.com/aws/aws-sdk-go-v2/config v1.32.6
|
||||
|
||||
6
go.sum
6
go.sum
@@ -4,10 +4,8 @@ github.com/99designs/gqlgen v0.17.85 h1:EkGx3U2FDcxQm8YDLQSpXIAVmpDyZ3IcBMOJi2nH
|
||||
github.com/99designs/gqlgen v0.17.85/go.mod h1:yvs8s0bkQlRfqg03YXr3eR4OQUowVhODT/tHzCXnbOU=
|
||||
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8=
|
||||
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU=
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.2.1 h1:iCVas+Jc61zFH5S2VG3H1sc7tsn+U4lOJwUYjYZEims=
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.2.1/go.mod h1:JuxMAuEOaLLNEnnL9U3ejha8kMvsSatLdKPZEgJw6iw=
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.2.2 h1:ye4RY57I19c2cXr3XWZBS/QYYgQVeGFvsiu5HkyKq9E=
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.2.2/go.mod h1:JuxMAuEOaLLNEnnL9U3ejha8kMvsSatLdKPZEgJw6iw=
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.4.0 h1:OnZlvqSatg7yCQ2NtSR7AddpUVSiuSMZ8scF1a7nfOk=
|
||||
github.com/ClusterCockpit/cc-lib/v2 v2.4.0/go.mod h1:JuxMAuEOaLLNEnnL9U3ejha8kMvsSatLdKPZEgJw6iw=
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
|
||||
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
|
||||
|
||||
@@ -36,9 +36,9 @@ type GetClustersAPIResponse struct {
|
||||
// @router /api/clusters/ [get]
|
||||
func (api *RestAPI) getClusters(rw http.ResponseWriter, r *http.Request) {
|
||||
if user := repository.GetUserFromContext(r.Context()); user != nil &&
|
||||
!user.HasRole(schema.RoleApi) {
|
||||
!user.HasRole(schema.RoleAPI) {
|
||||
|
||||
handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleApi)), http.StatusForbidden, rw)
|
||||
handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleAPI)), http.StatusForbidden, rw)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -1054,8 +1054,8 @@ type GetUsedNodesAPIResponse struct {
|
||||
// @router /api/jobs/used_nodes [get]
|
||||
func (api *RestAPI) getUsedNodes(rw http.ResponseWriter, r *http.Request) {
|
||||
if user := repository.GetUserFromContext(r.Context()); user != nil &&
|
||||
!user.HasRole(schema.RoleApi) {
|
||||
handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleApi)), http.StatusForbidden, rw)
|
||||
!user.HasRole(schema.RoleAPI) {
|
||||
handleError(fmt.Errorf("missing role: %v", schema.GetRoleString(schema.RoleAPI)), http.StatusForbidden, rw)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -80,7 +80,7 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
|
||||
ms := metricstore.GetMemoryStore()
|
||||
|
||||
m := make(map[string][]string)
|
||||
healthStates := make(map[string]schema.MonitoringState)
|
||||
healthResults := make(map[string]metricstore.HealthCheckResult)
|
||||
|
||||
startMs := time.Now()
|
||||
|
||||
@@ -94,8 +94,8 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
|
||||
if sc != "" {
|
||||
metricList := archive.GetMetricConfigSubCluster(req.Cluster, sc)
|
||||
metricNames := metricListToNames(metricList)
|
||||
if states, err := ms.HealthCheck(req.Cluster, nl, metricNames); err == nil {
|
||||
maps.Copy(healthStates, states)
|
||||
if results, err := ms.HealthCheck(req.Cluster, nl, metricNames); err == nil {
|
||||
maps.Copy(healthResults, results)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -106,8 +106,10 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
|
||||
for _, node := range req.Nodes {
|
||||
state := determineState(node.States)
|
||||
healthState := schema.MonitoringStateFailed
|
||||
if hs, ok := healthStates[node.Hostname]; ok {
|
||||
healthState = hs
|
||||
var healthMetrics string
|
||||
if result, ok := healthResults[node.Hostname]; ok {
|
||||
healthState = result.State
|
||||
healthMetrics = result.HealthMetrics
|
||||
}
|
||||
nodeState := schema.NodeStateDB{
|
||||
TimeStamp: requestReceived,
|
||||
@@ -116,10 +118,14 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) {
|
||||
MemoryAllocated: node.MemoryAllocated,
|
||||
GpusAllocated: node.GpusAllocated,
|
||||
HealthState: healthState,
|
||||
HealthMetrics: healthMetrics,
|
||||
JobsRunning: node.JobsRunning,
|
||||
}
|
||||
|
||||
repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState)
|
||||
if err := repo.UpdateNodeState(node.Hostname, req.Cluster, &nodeState); err != nil {
|
||||
cclog.Errorf("updateNodeStates: updating node state for %s on %s failed: %v",
|
||||
node.Hostname, req.Cluster, err)
|
||||
}
|
||||
}
|
||||
|
||||
cclog.Debugf("Timer updateNodeStates, SQLite Inserts: %s", time.Since(startDB))
|
||||
|
||||
@@ -164,7 +164,7 @@ func (api *RestAPI) createUser(rw http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
if len(password) == 0 && role != schema.GetRoleString(schema.RoleApi) {
|
||||
if len(password) == 0 && role != schema.GetRoleString(schema.RoleAPI) {
|
||||
handleError(fmt.Errorf("only API users are allowed to have a blank password (login will be impossible)"), http.StatusBadRequest, rw)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -448,13 +448,13 @@ func (auth *Authentication) AuthAPI(
|
||||
if user != nil {
|
||||
switch {
|
||||
case len(user.Roles) == 1:
|
||||
if user.HasRole(schema.RoleApi) {
|
||||
if user.HasRole(schema.RoleAPI) {
|
||||
ctx := context.WithValue(r.Context(), repository.ContextUserKey, user)
|
||||
onsuccess.ServeHTTP(rw, r.WithContext(ctx))
|
||||
return
|
||||
}
|
||||
case len(user.Roles) >= 2:
|
||||
if user.HasAllRoles([]schema.Role{schema.RoleAdmin, schema.RoleApi}) {
|
||||
if user.HasAllRoles([]schema.Role{schema.RoleAdmin, schema.RoleAPI}) {
|
||||
ctx := context.WithValue(r.Context(), repository.ContextUserKey, user)
|
||||
onsuccess.ServeHTTP(rw, r.WithContext(ctx))
|
||||
return
|
||||
@@ -484,13 +484,13 @@ func (auth *Authentication) AuthUserAPI(
|
||||
if user != nil {
|
||||
switch {
|
||||
case len(user.Roles) == 1:
|
||||
if user.HasRole(schema.RoleApi) {
|
||||
if user.HasRole(schema.RoleAPI) {
|
||||
ctx := context.WithValue(r.Context(), repository.ContextUserKey, user)
|
||||
onsuccess.ServeHTTP(rw, r.WithContext(ctx))
|
||||
return
|
||||
}
|
||||
case len(user.Roles) >= 2:
|
||||
if user.HasRole(schema.RoleApi) && user.HasAnyRole([]schema.Role{schema.RoleUser, schema.RoleManager, schema.RoleSupport, schema.RoleAdmin}) {
|
||||
if user.HasRole(schema.RoleAPI) && user.HasAnyRole([]schema.Role{schema.RoleUser, schema.RoleManager, schema.RoleSupport, schema.RoleAdmin}) {
|
||||
ctx := context.WithValue(r.Context(), repository.ContextUserKey, user)
|
||||
onsuccess.ServeHTTP(rw, r.WithContext(ctx))
|
||||
return
|
||||
@@ -520,13 +520,13 @@ func (auth *Authentication) AuthMetricStoreAPI(
|
||||
if user != nil {
|
||||
switch {
|
||||
case len(user.Roles) == 1:
|
||||
if user.HasRole(schema.RoleApi) {
|
||||
if user.HasRole(schema.RoleAPI) {
|
||||
ctx := context.WithValue(r.Context(), repository.ContextUserKey, user)
|
||||
onsuccess.ServeHTTP(rw, r.WithContext(ctx))
|
||||
return
|
||||
}
|
||||
case len(user.Roles) >= 2:
|
||||
if user.HasRole(schema.RoleApi) && user.HasAnyRole([]schema.Role{schema.RoleUser, schema.RoleManager, schema.RoleAdmin}) {
|
||||
if user.HasRole(schema.RoleAPI) && user.HasAnyRole([]schema.Role{schema.RoleUser, schema.RoleManager, schema.RoleAdmin}) {
|
||||
ctx := context.WithValue(r.Context(), repository.ContextUserKey, user)
|
||||
onsuccess.ServeHTTP(rw, r.WithContext(ctx))
|
||||
return
|
||||
|
||||
@@ -71,6 +71,23 @@ type ProgramConfig struct {
|
||||
|
||||
// If exists, will enable dynamic zoom in frontend metric plots using the configured values
|
||||
EnableResampling *ResampleConfig `json:"resampling"`
|
||||
|
||||
// Node state retention configuration
|
||||
NodeStateRetention *NodeStateRetention `json:"nodestate-retention"`
|
||||
}
|
||||
|
||||
type NodeStateRetention struct {
|
||||
Policy string `json:"policy"` // "delete" or "parquet"
|
||||
Age int `json:"age"` // hours, default 24
|
||||
TargetKind string `json:"target-kind"` // "file" or "s3"
|
||||
TargetPath string `json:"target-path"`
|
||||
TargetEndpoint string `json:"target-endpoint"`
|
||||
TargetBucket string `json:"target-bucket"`
|
||||
TargetAccessKey string `json:"target-access-key"`
|
||||
TargetSecretKey string `json:"target-secret-key"`
|
||||
TargetRegion string `json:"target-region"`
|
||||
TargetUsePathStyle bool `json:"target-use-path-style"`
|
||||
MaxFileSizeMB int `json:"max-file-size-mb"`
|
||||
}
|
||||
|
||||
type ResampleConfig struct {
|
||||
|
||||
@@ -130,6 +130,59 @@ var configSchema = `
|
||||
}
|
||||
},
|
||||
"required": ["subject-job-event", "subject-node-state"]
|
||||
},
|
||||
"nodestate-retention": {
|
||||
"description": "Node state retention configuration for cleaning up old node_state rows.",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"policy": {
|
||||
"description": "Retention policy: 'delete' to remove old rows, 'parquet' to archive then delete.",
|
||||
"type": "string",
|
||||
"enum": ["delete", "parquet"]
|
||||
},
|
||||
"age": {
|
||||
"description": "Retention age in hours (default: 24).",
|
||||
"type": "integer"
|
||||
},
|
||||
"target-kind": {
|
||||
"description": "Target kind for parquet archiving: 'file' or 's3'.",
|
||||
"type": "string",
|
||||
"enum": ["file", "s3"]
|
||||
},
|
||||
"target-path": {
|
||||
"description": "Filesystem path for parquet file target.",
|
||||
"type": "string"
|
||||
},
|
||||
"target-endpoint": {
|
||||
"description": "S3 endpoint URL.",
|
||||
"type": "string"
|
||||
},
|
||||
"target-bucket": {
|
||||
"description": "S3 bucket name.",
|
||||
"type": "string"
|
||||
},
|
||||
"target-access-key": {
|
||||
"description": "S3 access key.",
|
||||
"type": "string"
|
||||
},
|
||||
"target-secret-key": {
|
||||
"description": "S3 secret key.",
|
||||
"type": "string"
|
||||
},
|
||||
"target-region": {
|
||||
"description": "S3 region.",
|
||||
"type": "string"
|
||||
},
|
||||
"target-use-path-style": {
|
||||
"description": "Use path-style S3 addressing.",
|
||||
"type": "boolean"
|
||||
},
|
||||
"max-file-size-mb": {
|
||||
"description": "Maximum parquet file size in MB (default: 128).",
|
||||
"type": "integer"
|
||||
}
|
||||
},
|
||||
"required": ["policy"]
|
||||
}
|
||||
}
|
||||
}`
|
||||
|
||||
@@ -10245,7 +10245,7 @@ func (ec *executionContext) _Series_id(ctx context.Context, field graphql.Collec
|
||||
field,
|
||||
ec.fieldContext_Series_id,
|
||||
func(ctx context.Context) (any, error) {
|
||||
return obj.Id, nil
|
||||
return obj.ID, nil
|
||||
},
|
||||
nil,
|
||||
ec.marshalOString2ᚖstring,
|
||||
|
||||
@@ -552,7 +552,7 @@ func (r *queryResolver) ScopedJobStats(ctx context.Context, id string, metrics [
|
||||
for _, stat := range stats {
|
||||
mdlStats = append(mdlStats, &model.ScopedStats{
|
||||
Hostname: stat.Hostname,
|
||||
ID: stat.Id,
|
||||
ID: stat.ID,
|
||||
Data: stat.Data,
|
||||
})
|
||||
}
|
||||
@@ -824,6 +824,7 @@ func (r *queryResolver) NodeMetricsList(ctx context.Context, cluster string, sub
|
||||
}
|
||||
|
||||
nodeRepo := repository.GetNodeRepository()
|
||||
// nodes -> array hostname
|
||||
nodes, stateMap, countNodes, hasNextPage, nerr := nodeRepo.GetNodesForList(ctx, cluster, subCluster, stateFilter, nodeFilter, page)
|
||||
if nerr != nil {
|
||||
return nil, errors.New("could not retrieve node list required for resolving NodeMetricsList")
|
||||
@@ -835,6 +836,7 @@ func (r *queryResolver) NodeMetricsList(ctx context.Context, cluster string, sub
|
||||
}
|
||||
}
|
||||
|
||||
// data -> map hostname:jobdata
|
||||
data, err := metricdispatch.LoadNodeListData(cluster, subCluster, nodes, metrics, scopes, *resolution, from, to, ctx)
|
||||
if err != nil {
|
||||
cclog.Warn("error while loading node data (Resolver.NodeMetricsList")
|
||||
@@ -842,18 +844,18 @@ func (r *queryResolver) NodeMetricsList(ctx context.Context, cluster string, sub
|
||||
}
|
||||
|
||||
nodeMetricsList := make([]*model.NodeMetrics, 0, len(data))
|
||||
for hostname, metrics := range data {
|
||||
for _, hostname := range nodes {
|
||||
host := &model.NodeMetrics{
|
||||
Host: hostname,
|
||||
State: stateMap[hostname],
|
||||
Metrics: make([]*model.JobMetricWithName, 0, len(metrics)*len(scopes)),
|
||||
Metrics: make([]*model.JobMetricWithName, 0),
|
||||
}
|
||||
host.SubCluster, err = archive.GetSubClusterByNode(cluster, hostname)
|
||||
if err != nil {
|
||||
cclog.Warnf("error in nodeMetrics resolver: %s", err)
|
||||
}
|
||||
|
||||
for metric, scopedMetrics := range metrics {
|
||||
for metric, scopedMetrics := range data[hostname] {
|
||||
for scope, scopedMetric := range scopedMetrics {
|
||||
host.Metrics = append(host.Metrics, &model.JobMetricWithName{
|
||||
Name: metric,
|
||||
@@ -867,7 +869,8 @@ func (r *queryResolver) NodeMetricsList(ctx context.Context, cluster string, sub
|
||||
}
|
||||
|
||||
nodeMetricsListResult := &model.NodesResultList{
|
||||
Items: nodeMetricsList,
|
||||
Items: nodeMetricsList,
|
||||
// TotalNodes depends on sum of nodes grouped on latest timestamp, see repo/node.go:357
|
||||
TotalNodes: &countNodes,
|
||||
HasNextPage: &hasNextPage,
|
||||
}
|
||||
|
||||
@@ -499,7 +499,7 @@ func copyJobMetric(src *schema.JobMetric) *schema.JobMetric {
|
||||
func copySeries(src *schema.Series) schema.Series {
|
||||
dst := schema.Series{
|
||||
Hostname: src.Hostname,
|
||||
Id: src.Id,
|
||||
ID: src.ID,
|
||||
Statistics: src.Statistics,
|
||||
Data: make([]schema.Float, len(src.Data)),
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ func TestDeepCopy(t *testing.T) {
|
||||
Series: []schema.Series{
|
||||
{
|
||||
Hostname: "node001",
|
||||
Id: &nodeId,
|
||||
ID: &nodeId,
|
||||
Data: []schema.Float{1.0, 2.0, 3.0},
|
||||
Statistics: schema.MetricStatistics{
|
||||
Min: 1.0,
|
||||
|
||||
@@ -267,7 +267,7 @@ func (ccms *CCMetricStore) LoadData(
|
||||
|
||||
jobMetric.Series = append(jobMetric.Series, schema.Series{
|
||||
Hostname: query.Hostname,
|
||||
Id: id,
|
||||
ID: id,
|
||||
Statistics: schema.MetricStatistics{
|
||||
Avg: float64(res.Avg),
|
||||
Min: float64(res.Min),
|
||||
@@ -419,7 +419,7 @@ func (ccms *CCMetricStore) LoadScopedStats(
|
||||
|
||||
scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{
|
||||
Hostname: query.Hostname,
|
||||
Id: id,
|
||||
ID: id,
|
||||
Data: &schema.MetricStatistics{
|
||||
Avg: float64(res.Avg),
|
||||
Min: float64(res.Min),
|
||||
@@ -634,7 +634,7 @@ func (ccms *CCMetricStore) LoadNodeListData(
|
||||
|
||||
scopeData.Series = append(scopeData.Series, schema.Series{
|
||||
Hostname: query.Hostname,
|
||||
Id: id,
|
||||
ID: id,
|
||||
Statistics: schema.MetricStatistics{
|
||||
Avg: float64(res.Avg),
|
||||
Min: float64(res.Min),
|
||||
|
||||
@@ -150,7 +150,7 @@ func SecurityCheckWithUser(user *schema.User, query sq.SelectBuilder) (sq.Select
|
||||
}
|
||||
|
||||
switch {
|
||||
case len(user.Roles) == 1 && user.HasRole(schema.RoleApi):
|
||||
case len(user.Roles) == 1 && user.HasRole(schema.RoleAPI):
|
||||
return query, nil
|
||||
case user.HasAnyRole([]schema.Role{schema.RoleAdmin, schema.RoleSupport}):
|
||||
return query, nil
|
||||
|
||||
@@ -23,6 +23,7 @@ CREATE TABLE "node_state" (
|
||||
CHECK (health_state IN (
|
||||
'full', 'partial', 'failed'
|
||||
)),
|
||||
health_metrics TEXT, -- JSON array of strings
|
||||
node_id INTEGER,
|
||||
FOREIGN KEY (node_id) REFERENCES node (id)
|
||||
);
|
||||
|
||||
@@ -169,9 +169,10 @@ func (r *NodeRepository) AddNode(node *schema.NodeDB) (int64, error) {
|
||||
}
|
||||
|
||||
const NamedNodeStateInsert string = `
|
||||
INSERT INTO node_state (time_stamp, node_state, health_state, cpus_allocated,
|
||||
memory_allocated, gpus_allocated, jobs_running, node_id)
|
||||
VALUES (:time_stamp, :node_state, :health_state, :cpus_allocated, :memory_allocated, :gpus_allocated, :jobs_running, :node_id);`
|
||||
INSERT INTO node_state (time_stamp, node_state, health_state, health_metrics,
|
||||
cpus_allocated, memory_allocated, gpus_allocated, jobs_running, node_id)
|
||||
VALUES (:time_stamp, :node_state, :health_state, :health_metrics,
|
||||
:cpus_allocated, :memory_allocated, :gpus_allocated, :jobs_running, :node_id);`
|
||||
|
||||
// TODO: Add real Monitoring Health State
|
||||
|
||||
@@ -224,6 +225,75 @@ func (r *NodeRepository) UpdateNodeState(hostname string, cluster string, nodeSt
|
||||
// return nil
|
||||
// }
|
||||
|
||||
// NodeStateWithNode combines a node state row with denormalized node info.
|
||||
type NodeStateWithNode struct {
|
||||
ID int64 `db:"id"`
|
||||
TimeStamp int64 `db:"time_stamp"`
|
||||
NodeState string `db:"node_state"`
|
||||
HealthState string `db:"health_state"`
|
||||
HealthMetrics string `db:"health_metrics"`
|
||||
CpusAllocated int `db:"cpus_allocated"`
|
||||
MemoryAllocated int64 `db:"memory_allocated"`
|
||||
GpusAllocated int `db:"gpus_allocated"`
|
||||
JobsRunning int `db:"jobs_running"`
|
||||
Hostname string `db:"hostname"`
|
||||
Cluster string `db:"cluster"`
|
||||
SubCluster string `db:"subcluster"`
|
||||
}
|
||||
|
||||
// FindNodeStatesBefore returns all node_state rows with time_stamp < cutoff,
|
||||
// joined with node info for denormalized archiving.
|
||||
func (r *NodeRepository) FindNodeStatesBefore(cutoff int64) ([]NodeStateWithNode, error) {
|
||||
rows, err := sq.Select(
|
||||
"node_state.id", "node_state.time_stamp", "node_state.node_state",
|
||||
"node_state.health_state", "node_state.health_metrics",
|
||||
"node_state.cpus_allocated", "node_state.memory_allocated",
|
||||
"node_state.gpus_allocated", "node_state.jobs_running",
|
||||
"node.hostname", "node.cluster", "node.subcluster",
|
||||
).
|
||||
From("node_state").
|
||||
Join("node ON node_state.node_id = node.id").
|
||||
Where(sq.Lt{"node_state.time_stamp": cutoff}).
|
||||
Where("node_state.id NOT IN (SELECT ns2.id FROM node_state ns2 WHERE ns2.time_stamp = (SELECT MAX(ns3.time_stamp) FROM node_state ns3 WHERE ns3.node_id = ns2.node_id))").
|
||||
OrderBy("node_state.time_stamp ASC").
|
||||
RunWith(r.DB).Query()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var result []NodeStateWithNode
|
||||
for rows.Next() {
|
||||
var ns NodeStateWithNode
|
||||
if err := rows.Scan(&ns.ID, &ns.TimeStamp, &ns.NodeState,
|
||||
&ns.HealthState, &ns.HealthMetrics,
|
||||
&ns.CpusAllocated, &ns.MemoryAllocated,
|
||||
&ns.GpusAllocated, &ns.JobsRunning,
|
||||
&ns.Hostname, &ns.Cluster, &ns.SubCluster); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, ns)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// DeleteNodeStatesBefore removes node_state rows with time_stamp < cutoff,
|
||||
// but always preserves the row with the latest timestamp per node_id.
|
||||
func (r *NodeRepository) DeleteNodeStatesBefore(cutoff int64) (int64, error) {
|
||||
res, err := r.DB.Exec(
|
||||
`DELETE FROM node_state WHERE time_stamp < ?
|
||||
AND id NOT IN (
|
||||
SELECT id FROM node_state ns2
|
||||
WHERE ns2.time_stamp = (SELECT MAX(ns3.time_stamp) FROM node_state ns3 WHERE ns3.node_id = ns2.node_id)
|
||||
)`,
|
||||
cutoff,
|
||||
)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return res.RowsAffected()
|
||||
}
|
||||
|
||||
func (r *NodeRepository) DeleteNode(id int64) error {
|
||||
_, err := r.DB.Exec(`DELETE FROM node WHERE node.id = ?`, id)
|
||||
if err != nil {
|
||||
@@ -263,14 +333,16 @@ func (r *NodeRepository) QueryNodes(
|
||||
if f.SchedulerState != nil {
|
||||
query = query.Where("node_state = ?", f.SchedulerState)
|
||||
// Requires Additional time_stamp Filter: Else the last (past!) time_stamp with queried state will be returned
|
||||
// TODO: Hardcoded TimeDiff Suboptimal - Use Config Option?
|
||||
now := time.Now().Unix()
|
||||
query = query.Where(sq.Gt{"time_stamp": (now - 60)})
|
||||
query = query.Where(sq.Gt{"time_stamp": (now - 300)})
|
||||
}
|
||||
if f.HealthState != nil {
|
||||
query = query.Where("health_state = ?", f.HealthState)
|
||||
// Requires Additional time_stamp Filter: Else the last (past!) time_stamp with queried state will be returned
|
||||
// TODO: Hardcoded TimeDiff Suboptimal - Use Config Option?
|
||||
now := time.Now().Unix()
|
||||
query = query.Where(sq.Gt{"time_stamp": (now - 60)})
|
||||
query = query.Where(sq.Gt{"time_stamp": (now - 300)})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -331,14 +403,16 @@ func (r *NodeRepository) CountNodes(
|
||||
if f.SchedulerState != nil {
|
||||
query = query.Where("node_state = ?", f.SchedulerState)
|
||||
// Requires Additional time_stamp Filter: Else the last (past!) time_stamp with queried state will be returned
|
||||
// TODO: Hardcoded TimeDiff Suboptimal - Use Config Option?
|
||||
now := time.Now().Unix()
|
||||
query = query.Where(sq.Gt{"time_stamp": (now - 60)})
|
||||
query = query.Where(sq.Gt{"time_stamp": (now - 300)})
|
||||
}
|
||||
if f.HealthState != nil {
|
||||
query = query.Where("health_state = ?", f.HealthState)
|
||||
// Requires Additional time_stamp Filter: Else the last (past!) time_stamp with queried state will be returned
|
||||
// TODO: Hardcoded TimeDiff Suboptimal - Use Config Option?
|
||||
now := time.Now().Unix()
|
||||
query = query.Where(sq.Gt{"time_stamp": (now - 60)})
|
||||
query = query.Where(sq.Gt{"time_stamp": (now - 300)})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -156,8 +156,12 @@ func nodeTestSetup(t *testing.T) {
|
||||
func TestUpdateNodeState(t *testing.T) {
|
||||
nodeTestSetup(t)
|
||||
|
||||
repo := GetNodeRepository()
|
||||
now := time.Now().Unix()
|
||||
|
||||
nodeState := schema.NodeStateDB{
|
||||
TimeStamp: time.Now().Unix(), NodeState: "allocated",
|
||||
TimeStamp: now,
|
||||
NodeState: "allocated",
|
||||
CpusAllocated: 72,
|
||||
MemoryAllocated: 480,
|
||||
GpusAllocated: 0,
|
||||
@@ -165,18 +169,152 @@ func TestUpdateNodeState(t *testing.T) {
|
||||
JobsRunning: 1,
|
||||
}
|
||||
|
||||
repo := GetNodeRepository()
|
||||
err := repo.UpdateNodeState("host124", "testcluster", &nodeState)
|
||||
if err != nil {
|
||||
return
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
node, err := repo.GetNode("host124", "testcluster", false)
|
||||
if err != nil {
|
||||
return
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if node.NodeState != "allocated" {
|
||||
t.Errorf("wrong node state\ngot: %s \nwant: allocated ", node.NodeState)
|
||||
}
|
||||
|
||||
t.Run("FindBeforeEmpty", func(t *testing.T) {
|
||||
// Only the current-timestamp row exists, so nothing should be found before now
|
||||
rows, err := repo.FindNodeStatesBefore(now)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(rows) != 0 {
|
||||
t.Errorf("expected 0 rows, got %d", len(rows))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("DeleteOldRows", func(t *testing.T) {
|
||||
// Insert 2 more old rows for host124
|
||||
for i, ts := range []int64{now - 7200, now - 3600} {
|
||||
ns := schema.NodeStateDB{
|
||||
TimeStamp: ts,
|
||||
NodeState: "allocated",
|
||||
HealthState: schema.MonitoringStateFull,
|
||||
CpusAllocated: 72,
|
||||
MemoryAllocated: 480,
|
||||
JobsRunning: i,
|
||||
}
|
||||
if err := repo.UpdateNodeState("host124", "testcluster", &ns); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete rows older than 30 minutes
|
||||
cutoff := now - 1800
|
||||
cnt, err := repo.DeleteNodeStatesBefore(cutoff)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Should delete the 2 old rows
|
||||
if cnt != 2 {
|
||||
t.Errorf("expected 2 deleted rows, got %d", cnt)
|
||||
}
|
||||
|
||||
// Latest row should still exist
|
||||
node, err := repo.GetNode("host124", "testcluster", false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if node.NodeState != "allocated" {
|
||||
t.Errorf("expected node state 'allocated', got %s", node.NodeState)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("PreservesLatestPerNode", func(t *testing.T) {
|
||||
// Insert a single old row for host125 — it's the latest per node so it must survive
|
||||
ns := schema.NodeStateDB{
|
||||
TimeStamp: now - 7200,
|
||||
NodeState: "idle",
|
||||
HealthState: schema.MonitoringStateFull,
|
||||
CpusAllocated: 0,
|
||||
MemoryAllocated: 0,
|
||||
JobsRunning: 0,
|
||||
}
|
||||
if err := repo.UpdateNodeState("host125", "testcluster", &ns); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Delete everything older than now — the latest per node should be preserved
|
||||
_, err := repo.DeleteNodeStatesBefore(now)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// The latest row for host125 must still exist
|
||||
node, err := repo.GetNode("host125", "testcluster", false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if node.NodeState != "idle" {
|
||||
t.Errorf("expected node state 'idle', got %s", node.NodeState)
|
||||
}
|
||||
|
||||
// Verify exactly 1 row remains for host125
|
||||
var countAfter int
|
||||
if err := repo.DB.QueryRow(
|
||||
"SELECT COUNT(*) FROM node_state WHERE node_id = (SELECT id FROM node WHERE hostname = 'host125')").
|
||||
Scan(&countAfter); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if countAfter != 1 {
|
||||
t.Errorf("expected 1 row remaining for host125, got %d", countAfter)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("FindBeforeWithJoin", func(t *testing.T) {
|
||||
// Insert old and current rows for host123
|
||||
for _, ts := range []int64{now - 7200, now} {
|
||||
ns := schema.NodeStateDB{
|
||||
TimeStamp: ts,
|
||||
NodeState: "allocated",
|
||||
HealthState: schema.MonitoringStateFull,
|
||||
CpusAllocated: 8,
|
||||
MemoryAllocated: 1024,
|
||||
GpusAllocated: 1,
|
||||
JobsRunning: 1,
|
||||
}
|
||||
if err := repo.UpdateNodeState("host123", "testcluster", &ns); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Find rows older than 30 minutes, excluding latest per node
|
||||
cutoff := now - 1800
|
||||
rows, err := repo.FindNodeStatesBefore(cutoff)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Should find the old host123 row
|
||||
found := false
|
||||
for _, row := range rows {
|
||||
if row.Hostname == "host123" && row.TimeStamp == now-7200 {
|
||||
found = true
|
||||
if row.Cluster != "testcluster" {
|
||||
t.Errorf("expected cluster 'testcluster', got %s", row.Cluster)
|
||||
}
|
||||
if row.SubCluster != "sc1" {
|
||||
t.Errorf("expected subcluster 'sc1', got %s", row.SubCluster)
|
||||
}
|
||||
if row.CpusAllocated != 8 {
|
||||
t.Errorf("expected cpus_allocated 8, got %d", row.CpusAllocated)
|
||||
}
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("expected to find old host123 row among %d results", len(rows))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -644,12 +644,12 @@ func (r *JobRepository) checkScopeAuth(user *schema.User, operation string, scop
|
||||
if user != nil {
|
||||
switch {
|
||||
case operation == "write" && scope == "admin":
|
||||
if user.HasRole(schema.RoleAdmin) || (len(user.Roles) == 1 && user.HasRole(schema.RoleApi)) {
|
||||
if user.HasRole(schema.RoleAdmin) || (len(user.Roles) == 1 && user.HasRole(schema.RoleAPI)) {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
case operation == "write" && scope == "global":
|
||||
if user.HasAnyRole([]schema.Role{schema.RoleAdmin, schema.RoleSupport}) || (len(user.Roles) == 1 && user.HasRole(schema.RoleApi)) {
|
||||
if user.HasAnyRole([]schema.Role{schema.RoleAdmin, schema.RoleSupport}) || (len(user.Roles) == 1 && user.HasRole(schema.RoleAPI)) {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
|
||||
120
internal/taskmanager/nodestateRetentionService.go
Normal file
120
internal/taskmanager/nodestateRetentionService.go
Normal file
@@ -0,0 +1,120 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved. This file is part of cc-backend.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package taskmanager
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
"github.com/ClusterCockpit/cc-backend/internal/repository"
|
||||
pqarchive "github.com/ClusterCockpit/cc-backend/pkg/archive/parquet"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
)
|
||||
|
||||
func RegisterNodeStateRetentionDeleteService(ageHours int) {
|
||||
cclog.Info("Register node state retention delete service")
|
||||
|
||||
s.NewJob(gocron.DurationJob(1*time.Hour),
|
||||
gocron.NewTask(
|
||||
func() {
|
||||
cutoff := time.Now().Unix() - int64(ageHours*3600)
|
||||
nodeRepo := repository.GetNodeRepository()
|
||||
cnt, err := nodeRepo.DeleteNodeStatesBefore(cutoff)
|
||||
if err != nil {
|
||||
cclog.Errorf("NodeState retention: error deleting old rows: %v", err)
|
||||
} else if cnt > 0 {
|
||||
cclog.Infof("NodeState retention: deleted %d old rows", cnt)
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
func RegisterNodeStateRetentionParquetService(cfg *config.NodeStateRetention) {
|
||||
cclog.Info("Register node state retention parquet service")
|
||||
|
||||
maxFileSizeMB := cfg.MaxFileSizeMB
|
||||
if maxFileSizeMB <= 0 {
|
||||
maxFileSizeMB = 128
|
||||
}
|
||||
|
||||
ageHours := cfg.Age
|
||||
if ageHours <= 0 {
|
||||
ageHours = 24
|
||||
}
|
||||
|
||||
var target pqarchive.ParquetTarget
|
||||
var err error
|
||||
|
||||
switch cfg.TargetKind {
|
||||
case "s3":
|
||||
target, err = pqarchive.NewS3Target(pqarchive.S3TargetConfig{
|
||||
Endpoint: cfg.TargetEndpoint,
|
||||
Bucket: cfg.TargetBucket,
|
||||
AccessKey: cfg.TargetAccessKey,
|
||||
SecretKey: cfg.TargetSecretKey,
|
||||
Region: cfg.TargetRegion,
|
||||
UsePathStyle: cfg.TargetUsePathStyle,
|
||||
})
|
||||
default:
|
||||
target, err = pqarchive.NewFileTarget(cfg.TargetPath)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
cclog.Errorf("NodeState parquet retention: failed to create target: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
s.NewJob(gocron.DurationJob(1*time.Hour),
|
||||
gocron.NewTask(
|
||||
func() {
|
||||
cutoff := time.Now().Unix() - int64(ageHours*3600)
|
||||
nodeRepo := repository.GetNodeRepository()
|
||||
|
||||
rows, err := nodeRepo.FindNodeStatesBefore(cutoff)
|
||||
if err != nil {
|
||||
cclog.Errorf("NodeState parquet retention: error finding rows: %v", err)
|
||||
return
|
||||
}
|
||||
if len(rows) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
cclog.Infof("NodeState parquet retention: archiving %d rows", len(rows))
|
||||
pw := pqarchive.NewNodeStateParquetWriter(target, maxFileSizeMB)
|
||||
|
||||
for _, ns := range rows {
|
||||
row := pqarchive.ParquetNodeStateRow{
|
||||
TimeStamp: ns.TimeStamp,
|
||||
NodeState: ns.NodeState,
|
||||
HealthState: ns.HealthState,
|
||||
HealthMetrics: ns.HealthMetrics,
|
||||
CpusAllocated: int32(ns.CpusAllocated),
|
||||
MemoryAllocated: ns.MemoryAllocated,
|
||||
GpusAllocated: int32(ns.GpusAllocated),
|
||||
JobsRunning: int32(ns.JobsRunning),
|
||||
Hostname: ns.Hostname,
|
||||
Cluster: ns.Cluster,
|
||||
SubCluster: ns.SubCluster,
|
||||
}
|
||||
if err := pw.AddRow(row); err != nil {
|
||||
cclog.Errorf("NodeState parquet retention: add row: %v", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if err := pw.Close(); err != nil {
|
||||
cclog.Errorf("NodeState parquet retention: close writer: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
cnt, err := nodeRepo.DeleteNodeStatesBefore(cutoff)
|
||||
if err != nil {
|
||||
cclog.Errorf("NodeState parquet retention: error deleting rows: %v", err)
|
||||
} else {
|
||||
cclog.Infof("NodeState parquet retention: deleted %d rows from db", cnt)
|
||||
}
|
||||
}))
|
||||
}
|
||||
@@ -144,9 +144,30 @@ func Start(cronCfg, archiveConfig json.RawMessage) {
|
||||
RegisterUpdateDurationWorker()
|
||||
RegisterCommitJobService()
|
||||
|
||||
if config.Keys.NodeStateRetention != nil && config.Keys.NodeStateRetention.Policy != "" {
|
||||
initNodeStateRetention()
|
||||
}
|
||||
|
||||
s.Start()
|
||||
}
|
||||
|
||||
func initNodeStateRetention() {
|
||||
cfg := config.Keys.NodeStateRetention
|
||||
age := cfg.Age
|
||||
if age <= 0 {
|
||||
age = 24
|
||||
}
|
||||
|
||||
switch cfg.Policy {
|
||||
case "delete":
|
||||
RegisterNodeStateRetentionDeleteService(age)
|
||||
case "parquet":
|
||||
RegisterNodeStateRetentionParquetService(cfg)
|
||||
default:
|
||||
cclog.Warnf("Unknown nodestate-retention policy: %s", cfg.Policy)
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown stops the task manager and its scheduler.
|
||||
func Shutdown() {
|
||||
if s != nil {
|
||||
|
||||
@@ -51,7 +51,7 @@ func DecodeJobStats(r io.Reader, k string) (schema.ScopedJobStats, error) {
|
||||
for _, series := range jobMetric.Series {
|
||||
scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{
|
||||
Hostname: series.Hostname,
|
||||
Id: series.Id,
|
||||
ID: series.ID,
|
||||
Data: &series.Statistics,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -81,7 +81,6 @@ func JobToParquetRow(meta *schema.Job, data *schema.JobData) (*ParquetJobRow, er
|
||||
NumNodes: meta.NumNodes,
|
||||
NumHWThreads: meta.NumHWThreads,
|
||||
NumAcc: meta.NumAcc,
|
||||
Exclusive: meta.Exclusive,
|
||||
Energy: meta.Energy,
|
||||
SMT: meta.SMT,
|
||||
ResourcesJSON: resourcesJSON,
|
||||
|
||||
20
pkg/archive/parquet/nodestate_schema.go
Normal file
20
pkg/archive/parquet/nodestate_schema.go
Normal file
@@ -0,0 +1,20 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved. This file is part of cc-backend.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package parquet
|
||||
|
||||
type ParquetNodeStateRow struct {
|
||||
TimeStamp int64 `parquet:"time_stamp"`
|
||||
NodeState string `parquet:"node_state"`
|
||||
HealthState string `parquet:"health_state"`
|
||||
HealthMetrics string `parquet:"health_metrics,optional"`
|
||||
CpusAllocated int32 `parquet:"cpus_allocated"`
|
||||
MemoryAllocated int64 `parquet:"memory_allocated"`
|
||||
GpusAllocated int32 `parquet:"gpus_allocated"`
|
||||
JobsRunning int32 `parquet:"jobs_running"`
|
||||
Hostname string `parquet:"hostname"`
|
||||
Cluster string `parquet:"cluster"`
|
||||
SubCluster string `parquet:"subcluster"`
|
||||
}
|
||||
104
pkg/archive/parquet/nodestate_writer.go
Normal file
104
pkg/archive/parquet/nodestate_writer.go
Normal file
@@ -0,0 +1,104 @@
|
||||
// Copyright (C) NHR@FAU, University Erlangen-Nuremberg.
|
||||
// All rights reserved. This file is part of cc-backend.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package parquet
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
pq "github.com/parquet-go/parquet-go"
|
||||
)
|
||||
|
||||
// NodeStateParquetWriter batches ParquetNodeStateRows and flushes them to a target
|
||||
// when the estimated size exceeds maxSizeBytes.
|
||||
type NodeStateParquetWriter struct {
|
||||
target ParquetTarget
|
||||
maxSizeBytes int64
|
||||
rows []ParquetNodeStateRow
|
||||
currentSize int64
|
||||
fileCounter int
|
||||
datePrefix string
|
||||
}
|
||||
|
||||
// NewNodeStateParquetWriter creates a new writer for node state parquet files.
|
||||
func NewNodeStateParquetWriter(target ParquetTarget, maxSizeMB int) *NodeStateParquetWriter {
|
||||
return &NodeStateParquetWriter{
|
||||
target: target,
|
||||
maxSizeBytes: int64(maxSizeMB) * 1024 * 1024,
|
||||
datePrefix: time.Now().Format("2006-01-02"),
|
||||
}
|
||||
}
|
||||
|
||||
// AddRow adds a row to the current batch. If the estimated batch size
|
||||
// exceeds the configured maximum, the batch is flushed first.
|
||||
func (pw *NodeStateParquetWriter) AddRow(row ParquetNodeStateRow) error {
|
||||
rowSize := estimateNodeStateRowSize(&row)
|
||||
|
||||
if pw.currentSize+rowSize > pw.maxSizeBytes && len(pw.rows) > 0 {
|
||||
if err := pw.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
pw.rows = append(pw.rows, row)
|
||||
pw.currentSize += rowSize
|
||||
return nil
|
||||
}
|
||||
|
||||
// Flush writes the current batch to a parquet file on the target.
|
||||
func (pw *NodeStateParquetWriter) Flush() error {
|
||||
if len(pw.rows) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
pw.fileCounter++
|
||||
fileName := fmt.Sprintf("cc-nodestate-%s-%03d.parquet", pw.datePrefix, pw.fileCounter)
|
||||
|
||||
data, err := writeNodeStateParquetBytes(pw.rows)
|
||||
if err != nil {
|
||||
return fmt.Errorf("write parquet buffer: %w", err)
|
||||
}
|
||||
|
||||
if err := pw.target.WriteFile(fileName, data); err != nil {
|
||||
return fmt.Errorf("write parquet file %q: %w", fileName, err)
|
||||
}
|
||||
|
||||
cclog.Infof("NodeState retention: wrote %s (%d rows, %d bytes)", fileName, len(pw.rows), len(data))
|
||||
pw.rows = pw.rows[:0]
|
||||
pw.currentSize = 0
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close flushes any remaining rows and finalizes the writer.
|
||||
func (pw *NodeStateParquetWriter) Close() error {
|
||||
return pw.Flush()
|
||||
}
|
||||
|
||||
func writeNodeStateParquetBytes(rows []ParquetNodeStateRow) ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
|
||||
writer := pq.NewGenericWriter[ParquetNodeStateRow](&buf,
|
||||
pq.Compression(&pq.Snappy),
|
||||
)
|
||||
|
||||
if _, err := writer.Write(rows); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := writer.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
func estimateNodeStateRowSize(row *ParquetNodeStateRow) int64 {
|
||||
size := int64(100) // fixed numeric fields
|
||||
size += int64(len(row.NodeState) + len(row.HealthState) + len(row.HealthMetrics))
|
||||
size += int64(len(row.Hostname) + len(row.Cluster) + len(row.SubCluster))
|
||||
return size
|
||||
}
|
||||
@@ -47,7 +47,6 @@ func makeTestJob(jobID int64) (*schema.Job, *schema.JobData) {
|
||||
Walltime: 7200,
|
||||
NumNodes: 2,
|
||||
NumHWThreads: 16,
|
||||
Exclusive: 1,
|
||||
SMT: 1,
|
||||
Resources: []*schema.Resource{
|
||||
{Hostname: "node001"},
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
package metricstore
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
@@ -19,6 +20,13 @@ type HealthCheckResponse struct {
|
||||
Error error
|
||||
}
|
||||
|
||||
// HealthCheckResult holds the monitoring state and raw JSON health metrics
|
||||
// for a single node as determined by HealthCheck.
|
||||
type HealthCheckResult struct {
|
||||
State schema.MonitoringState
|
||||
HealthMetrics string // JSON: {"missing":[...],"degraded":[...]}
|
||||
}
|
||||
|
||||
// MaxMissingDataPoints is the threshold for stale data detection.
|
||||
// A buffer is considered healthy if the gap between its last data point
|
||||
// and the current time is within MaxMissingDataPoints * frequency.
|
||||
@@ -134,15 +142,15 @@ func (m *MemoryStore) GetHealthyMetrics(selector []string, expectedMetrics []str
|
||||
// - MonitoringStateFailed: node not found, or no healthy metrics at all
|
||||
func (m *MemoryStore) HealthCheck(cluster string,
|
||||
nodes []string, expectedMetrics []string,
|
||||
) (map[string]schema.MonitoringState, error) {
|
||||
results := make(map[string]schema.MonitoringState, len(nodes))
|
||||
) (map[string]HealthCheckResult, error) {
|
||||
results := make(map[string]HealthCheckResult, len(nodes))
|
||||
|
||||
for _, hostname := range nodes {
|
||||
selector := []string{cluster, hostname}
|
||||
|
||||
degradedList, missingList, err := m.GetHealthyMetrics(selector, expectedMetrics)
|
||||
if err != nil {
|
||||
results[hostname] = schema.MonitoringStateFailed
|
||||
results[hostname] = HealthCheckResult{State: schema.MonitoringStateFailed}
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -158,13 +166,24 @@ func (m *MemoryStore) HealthCheck(cluster string,
|
||||
cclog.ComponentInfo("metricstore", "HealthCheck: node ", hostname, "missing metrics:", missingList)
|
||||
}
|
||||
|
||||
var state schema.MonitoringState
|
||||
switch {
|
||||
case degradedCount == 0 && missingCount == 0:
|
||||
results[hostname] = schema.MonitoringStateFull
|
||||
state = schema.MonitoringStateFull
|
||||
case healthyCount == 0:
|
||||
results[hostname] = schema.MonitoringStateFailed
|
||||
state = schema.MonitoringStateFailed
|
||||
default:
|
||||
results[hostname] = schema.MonitoringStatePartial
|
||||
state = schema.MonitoringStatePartial
|
||||
}
|
||||
|
||||
hm, _ := json.Marshal(map[string][]string{
|
||||
"missing": missingList,
|
||||
"degraded": degradedList,
|
||||
})
|
||||
|
||||
results[hostname] = HealthCheckResult{
|
||||
State: state,
|
||||
HealthMetrics: string(hm),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -253,8 +253,8 @@ func TestHealthCheck(t *testing.T) {
|
||||
|
||||
// Check status
|
||||
if wantStatus, ok := tt.wantStates[node]; ok {
|
||||
if state != wantStatus {
|
||||
t.Errorf("HealthCheck() node %s status = %v, want %v", node, state, wantStatus)
|
||||
if state.State != wantStatus {
|
||||
t.Errorf("HealthCheck() node %s status = %v, want %v", node, state.State, wantStatus)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,7 +149,7 @@ func (ccms *InternalMetricStore) LoadData(
|
||||
|
||||
jobMetric.Series = append(jobMetric.Series, schema.Series{
|
||||
Hostname: query.Hostname,
|
||||
Id: id,
|
||||
ID: id,
|
||||
Statistics: schema.MetricStatistics{
|
||||
Avg: float64(res.Avg),
|
||||
Min: float64(res.Min),
|
||||
@@ -651,7 +651,7 @@ func (ccms *InternalMetricStore) LoadScopedStats(
|
||||
|
||||
scopedJobStats[metric][scope] = append(scopedJobStats[metric][scope], &schema.ScopedStats{
|
||||
Hostname: query.Hostname,
|
||||
Id: id,
|
||||
ID: id,
|
||||
Data: &schema.MetricStatistics{
|
||||
Avg: float64(res.Avg),
|
||||
Min: float64(res.Min),
|
||||
@@ -894,7 +894,7 @@ func (ccms *InternalMetricStore) LoadNodeListData(
|
||||
|
||||
scopeData.Series = append(scopeData.Series, schema.Series{
|
||||
Hostname: query.Hostname,
|
||||
Id: id,
|
||||
ID: id,
|
||||
Statistics: schema.MetricStatistics{
|
||||
Avg: float64(res.Avg),
|
||||
Min: float64(res.Min),
|
||||
|
||||
@@ -30,7 +30,7 @@
|
||||
import {
|
||||
init,
|
||||
groupByScope,
|
||||
checkMetricDisabled,
|
||||
checkMetricAvailability,
|
||||
} from "./generic/utils.js";
|
||||
import Metric from "./job/Metric.svelte";
|
||||
import MetricSelection from "./generic/select/MetricSelection.svelte";
|
||||
@@ -151,17 +151,17 @@
|
||||
}
|
||||
return names;
|
||||
}, []);
|
||||
|
||||
//
|
||||
return metricNames.filter(
|
||||
(metric) =>
|
||||
!metrics.some((jm) => jm.name == metric) &&
|
||||
selectedMetrics.includes(metric) &&
|
||||
!checkMetricDisabled(
|
||||
(checkMetricAvailability(
|
||||
globalMetrics,
|
||||
metric,
|
||||
thisJob.cluster,
|
||||
thisJob.subCluster,
|
||||
),
|
||||
) == "configured")
|
||||
);
|
||||
} else {
|
||||
return []
|
||||
@@ -212,7 +212,7 @@
|
||||
inputMetrics.map((metric) => ({
|
||||
metric: metric,
|
||||
data: grouped.find((group) => group[0].name == metric),
|
||||
disabled: checkMetricDisabled(
|
||||
availability: checkMetricAvailability(
|
||||
globalMetrics,
|
||||
metric,
|
||||
thisJob.cluster,
|
||||
@@ -333,7 +333,17 @@
|
||||
{:else if thisJob && $jobMetrics?.data?.scopedJobStats}
|
||||
<!-- Note: Ignore '#snippet' Error in IDE -->
|
||||
{#snippet gridContent(item)}
|
||||
{#if item?.disabled}
|
||||
{#if item.availability == "none"}
|
||||
<Card color="light" class="mt-2">
|
||||
<CardHeader class="mb-0">
|
||||
<b>Metric not configured</b>
|
||||
</CardHeader>
|
||||
<CardBody>
|
||||
<p>No datasets returned for <b>{item.metric}</b>.</p>
|
||||
<p class="mb-1">Metric is not configured for cluster <b>{thisJob.cluster}</b>.</p>
|
||||
</CardBody>
|
||||
</Card>
|
||||
{:else if item.availability == "disabled"}
|
||||
<Card color="info" class="mt-2">
|
||||
<CardHeader class="mb-0">
|
||||
<b>Disabled Metric</b>
|
||||
|
||||
@@ -142,7 +142,8 @@
|
||||
<Filters
|
||||
bind:this={filterComponent}
|
||||
{filterPresets}
|
||||
shortJobQuickSelect
|
||||
startTimeQuickSelect
|
||||
shortJobQuickSelect={(filterBuffer.length > 0)}
|
||||
shortJobCutoff={ccconfig?.jobList_hideShortRunningJobs}
|
||||
showFilter={!showCompare}
|
||||
matchedJobs={showCompare? matchedCompareJobs: matchedListJobs}
|
||||
|
||||
@@ -32,7 +32,7 @@
|
||||
} from "@urql/svelte";
|
||||
import {
|
||||
init,
|
||||
checkMetricDisabled,
|
||||
checkMetricAvailability,
|
||||
} from "./generic/utils.js";
|
||||
import PlotGrid from "./generic/PlotGrid.svelte";
|
||||
import MetricPlot from "./generic/plots/MetricPlot.svelte";
|
||||
@@ -242,17 +242,17 @@
|
||||
{item.name}
|
||||
{systemUnits[item.name] ? "(" + systemUnits[item.name] + ")" : ""}
|
||||
</h4>
|
||||
{#if item.disabled === false && item.metric}
|
||||
<MetricPlot
|
||||
metric={item.name}
|
||||
timestep={item.metric.timestep}
|
||||
cluster={clusterInfos.find((c) => c.name == cluster)}
|
||||
subCluster={$nodeMetricsData.data.nodeMetrics[0].subCluster}
|
||||
series={item.metric.series}
|
||||
enableFlip
|
||||
forNode
|
||||
/>
|
||||
{:else if item.disabled === true && item.metric}
|
||||
{#if item.availability == "none"}
|
||||
<Card color="light" class="mx-2">
|
||||
<CardHeader class="mb-0">
|
||||
<b>Metric not configured</b>
|
||||
</CardHeader>
|
||||
<CardBody>
|
||||
<p>No datasets returned for <b>{item.name}</b>.</p>
|
||||
<p class="mb-1">Metric is not configured for cluster <b>{cluster}</b>.</p>
|
||||
</CardBody>
|
||||
</Card>
|
||||
{:else if item.availability == "disabled"}
|
||||
<Card color="info" class="mx-2">
|
||||
<CardHeader class="mb-0">
|
||||
<b>Disabled Metric</b>
|
||||
@@ -262,6 +262,16 @@
|
||||
<p class="mb-1">Metric has been disabled for subcluster <b>{$nodeMetricsData.data.nodeMetrics[0].subCluster}</b>.</p>
|
||||
</CardBody>
|
||||
</Card>
|
||||
{:else if item?.metric}
|
||||
<MetricPlot
|
||||
metric={item.name}
|
||||
timestep={item.metric.timestep}
|
||||
cluster={clusterInfos.find((c) => c.name == cluster)}
|
||||
subCluster={$nodeMetricsData.data.nodeMetrics[0].subCluster}
|
||||
series={item.metric.series}
|
||||
enableFlip
|
||||
forNode
|
||||
/>
|
||||
{:else}
|
||||
<Card color="warning" class="mx-2">
|
||||
<CardHeader class="mb-0">
|
||||
@@ -279,7 +289,7 @@
|
||||
items={$nodeMetricsData.data.nodeMetrics[0].metrics
|
||||
.map((m) => ({
|
||||
...m,
|
||||
disabled: checkMetricDisabled(
|
||||
availability: checkMetricAvailability(
|
||||
globalMetrics,
|
||||
m.name,
|
||||
cluster,
|
||||
|
||||
@@ -272,8 +272,8 @@
|
||||
<NodeOverview {cluster} {ccconfig} {selectedMetric} {globalMetrics} {from} {to} {hostnameFilter} {hoststateFilter}/>
|
||||
{:else}
|
||||
<!-- ROW2-2: Node List (Grid Included)-->
|
||||
<NodeList {cluster} {subCluster} {ccconfig} {globalMetrics}
|
||||
pendingSelectedMetrics={selectedMetrics} {selectedResolution} {hostnameFilter} {hoststateFilter} {from} {to} {systemUnits}/>
|
||||
<NodeList pendingSelectedMetrics={selectedMetrics} {cluster} {subCluster}
|
||||
{selectedResolution} {hostnameFilter} {hoststateFilter} {from} {to} {systemUnits}/>
|
||||
{/if}
|
||||
{/if}
|
||||
|
||||
|
||||
@@ -219,7 +219,8 @@
|
||||
<Filters
|
||||
bind:this={filterComponent}
|
||||
{filterPresets}
|
||||
shortJobQuickSelect
|
||||
startTimeQuickSelect
|
||||
shortJobQuickSelect={(filterBuffer.length > 0)}
|
||||
shortJobCutoff={ccconfig?.jobList_hideShortRunningJobs}
|
||||
showFilter={!showCompare}
|
||||
matchedJobs={showCompare? matchedCompareJobs: matchedListJobs}
|
||||
|
||||
@@ -451,7 +451,7 @@
|
||||
|
||||
{#if filters.startTime.range}
|
||||
<Info icon="calendar-range" onclick={() => (isStartTimeOpen = true)}>
|
||||
{startTimeSelectOptions.find((stso) => stso.range === filters.startTime.range).rangeLabel }
|
||||
Job Start: {startTimeSelectOptions.find((stso) => stso.range === filters.startTime.range).rangeLabel }
|
||||
</Info>
|
||||
{/if}
|
||||
|
||||
|
||||
@@ -14,10 +14,10 @@
|
||||
<script module>
|
||||
export const startTimeSelectOptions = [
|
||||
{ range: "", rangeLabel: "No Selection"},
|
||||
{ range: "last6h", rangeLabel: "Job Start: Last 6hrs"},
|
||||
{ range: "last24h", rangeLabel: "Job Start: Last 24hrs"},
|
||||
{ range: "last7d", rangeLabel: "Job Start: Last 7 days"},
|
||||
{ range: "last30d", rangeLabel: "Job Start: Last 30 days"}
|
||||
{ range: "last6h", rangeLabel: "Last 6 hrs"},
|
||||
{ range: "last24h", rangeLabel: "Last 24 hrs"},
|
||||
{ range: "last7d", rangeLabel: "Last 7 days"},
|
||||
{ range: "last30d", rangeLabel: "Last 30 days"}
|
||||
];
|
||||
</script>
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
<script>
|
||||
import { queryStore, gql, getContextClient } from "@urql/svelte";
|
||||
import { Card, Spinner } from "@sveltestrap/sveltestrap";
|
||||
import { maxScope, checkMetricDisabled } from "../utils.js";
|
||||
import { maxScope, checkMetricAvailability } from "../utils.js";
|
||||
import JobInfo from "./JobInfo.svelte";
|
||||
import MetricPlot from "../plots/MetricPlot.svelte";
|
||||
import JobFootprint from "../helper/JobFootprint.svelte";
|
||||
@@ -145,7 +145,7 @@
|
||||
metricList.forEach((metricName) => {
|
||||
const pendingMetric = {
|
||||
name: metricName,
|
||||
disabled: checkMetricDisabled(
|
||||
availability: checkMetricAvailability(
|
||||
globalMetrics,
|
||||
metricName,
|
||||
job.cluster,
|
||||
@@ -207,7 +207,12 @@
|
||||
{/if}
|
||||
{#each refinedData as metric, i (metric?.name || i)}
|
||||
<td>
|
||||
{#if metric?.disabled}
|
||||
{#if metric?.availability == "none"}
|
||||
<Card body class="mx-2" color="light">
|
||||
<p>No dataset(s) returned for <b>{metrics[i]}</b></p>
|
||||
<p class="mb-1">Metric is not configured for cluster <b>{job.cluster}</b>.</p>
|
||||
</Card>
|
||||
{:else if metric?.availability == "disabled"}
|
||||
<Card body class="mx-2" color="info">
|
||||
<p>No dataset(s) returned for <b>{metrics[i]}</b></p>
|
||||
<p class="mb-1">Metric has been disabled for subcluster <b>{job.subCluster}</b>.</p>
|
||||
|
||||
@@ -302,20 +302,36 @@ export function stickyHeader(datatableHeaderSelector, updatePading) {
|
||||
onDestroy(() => document.removeEventListener("scroll", onscroll));
|
||||
}
|
||||
|
||||
export function checkMetricDisabled(gm, m, c, s) { // [g]lobal[m]etrics, [m]etric, [c]luster, [s]ubcluster
|
||||
const available = gm?.find((gm) => gm.name === m)?.availability?.find((av) => av.cluster === c)?.subClusters?.includes(s)
|
||||
// Return inverse logic
|
||||
return !available
|
||||
export function checkMetricAvailability(gms, m, c, s = "") { // [g]lobal[m]etrics, [m]etric, [c]luster, [s]ubcluster
|
||||
let pendingAvailability = "none"
|
||||
const configured = gms?.find((gm) => gm.name === m)?.availability?.find((av) => av.cluster === c)
|
||||
if (configured) {
|
||||
pendingAvailability = "configured"
|
||||
if (s != "") {
|
||||
const enabled = configured.subClusters?.includes(s)
|
||||
// Test inverse logic
|
||||
if (!enabled) {
|
||||
pendingAvailability = "disabled"
|
||||
}
|
||||
}
|
||||
}
|
||||
return pendingAvailability;
|
||||
}
|
||||
|
||||
export function checkMetricsDisabled(gm, ma, c, s) { // [g]lobal[m]etrics, [m]etric[a]rray, [c]luster, [s]ubcluster
|
||||
let result = {};
|
||||
ma.forEach((m) => {
|
||||
// Return named inverse logic: !available
|
||||
result[m] = !(gm?.find((gm) => gm.name === m)?.availability?.find((av) => av.cluster === c)?.subClusters?.includes(s))
|
||||
});
|
||||
return result
|
||||
}
|
||||
// export function checkMetricDisabled(gm, m, c, s) { // [g]lobal[m]etrics, [m]etric, [c]luster, [s]ubcluster
|
||||
// const available = gm?.find((gm) => gm.name === m)?.availability?.find((av) => av.cluster === c)?.subClusters?.includes(s)
|
||||
// // Return inverse logic
|
||||
// return !available
|
||||
// }
|
||||
|
||||
// export function checkMetricsDisabled(gm, ma, c, s) { // [g]lobal[m]etrics, [m]etric[a]rray, [c]luster, [s]ubcluster
|
||||
// let aresult = {};
|
||||
// ma.forEach((m) => {
|
||||
// // Return named inverse logic: !available
|
||||
// aresult[m] = !(gm?.find((gm) => gm.name === m)?.availability?.find((av) => av.cluster === c)?.subClusters?.includes(s))
|
||||
// });
|
||||
// return aresult
|
||||
// }
|
||||
|
||||
export function getStatsItems(presetStats = []) {
|
||||
// console.time('stats')
|
||||
|
||||
@@ -35,6 +35,7 @@
|
||||
|
||||
/* Const Init */
|
||||
const ccconfig = getContext("cc-config");
|
||||
const globalMetrics = getContext("globalMetrics");
|
||||
const client = getContextClient();
|
||||
|
||||
/* State Init */
|
||||
@@ -139,6 +140,7 @@
|
||||
|
||||
<HistogramSelection
|
||||
{cluster}
|
||||
{globalMetrics}
|
||||
bind:isOpen={isHistogramSelectionOpen}
|
||||
presetSelectedHistograms={selectedHistograms}
|
||||
configName="statusView_selectedHistograms"
|
||||
|
||||
@@ -4,8 +4,6 @@
|
||||
Properties:
|
||||
- `cluster String`: The nodes' cluster
|
||||
- `subCluster String`: The nodes' subCluster [Default: ""]
|
||||
- `ccconfig Object?`: The ClusterCockpit Config Context [Default: null]
|
||||
- `globalMetrics [Obj]`: Includes the backend supplied availabilities for cluster and subCluster
|
||||
- `pendingSelectedMetrics [String]`: The array of selected metrics [Default []]
|
||||
- `selectedResolution Number?`: The selected data resolution [Default: 0]
|
||||
- `hostnameFilter String?`: The active hostnamefilter [Default: ""]
|
||||
@@ -16,7 +14,7 @@
|
||||
-->
|
||||
|
||||
<script>
|
||||
import { untrack } from "svelte";
|
||||
import { untrack, getContext } from "svelte";
|
||||
import { queryStore, gql, getContextClient, mutationStore } from "@urql/svelte";
|
||||
import { Row, Col, Card, Table, Spinner } from "@sveltestrap/sveltestrap";
|
||||
import { stickyHeader } from "../generic/utils.js";
|
||||
@@ -27,8 +25,6 @@
|
||||
let {
|
||||
cluster,
|
||||
subCluster = "",
|
||||
ccconfig = null,
|
||||
globalMetrics = null,
|
||||
pendingSelectedMetrics = [],
|
||||
selectedResolution = 0,
|
||||
hostnameFilter = "",
|
||||
@@ -99,10 +95,15 @@
|
||||
let headerPaddingTop = $state(0);
|
||||
|
||||
/* Derived */
|
||||
const initialized = $derived(getContext("initialized") || false);
|
||||
const ccconfig = $derived(initialized ? getContext("cc-config") : null);
|
||||
const globalMetrics = $derived(initialized ? getContext("globalMetrics") : null);
|
||||
const usePaging = $derived(ccconfig ? ccconfig.nodeList_usePaging : false);
|
||||
|
||||
let selectedMetrics = $derived(pendingSelectedMetrics);
|
||||
let itemsPerPage = $derived(usePaging ? (ccconfig?.nodeList_nodesPerPage || 10) : 10);
|
||||
const usePaging = $derived(ccconfig?.nodeList_usePaging || false);
|
||||
const paging = $derived({ itemsPerPage, page });
|
||||
let paging = $derived({ itemsPerPage, page });
|
||||
|
||||
const nodesQuery = $derived(queryStore({
|
||||
client: client,
|
||||
query: nodeListQuery,
|
||||
@@ -122,7 +123,7 @@
|
||||
}));
|
||||
|
||||
const matchedNodes = $derived($nodesQuery?.data?.nodeMetricsList?.totalNodes || 0);
|
||||
|
||||
|
||||
/* Effects */
|
||||
$effect(() => {
|
||||
if (!usePaging) {
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
<script>
|
||||
import { queryStore, gql, getContextClient } from "@urql/svelte";
|
||||
import { Row, Col, Card, CardHeader, CardBody, Spinner, Badge } from "@sveltestrap/sveltestrap";
|
||||
import { checkMetricDisabled } from "../generic/utils.js";
|
||||
import { checkMetricAvailability } from "../generic/utils.js";
|
||||
import MetricPlot from "../generic/plots/MetricPlot.svelte";
|
||||
|
||||
/* Svelte 5 Props */
|
||||
@@ -87,6 +87,7 @@
|
||||
},
|
||||
}));
|
||||
|
||||
const notConfigured = $derived(checkMetricAvailability(globalMetrics, selectedMetric, cluster) == "none");
|
||||
const mappedData = $derived(handleQueryData($nodesQuery?.data));
|
||||
const filteredData = $derived(mappedData.filter((h) => {
|
||||
if (hostnameFilter) {
|
||||
@@ -120,7 +121,7 @@
|
||||
data: h.metrics.filter(
|
||||
(m) => m?.name == selectedMetric && m.scope == "node",
|
||||
),
|
||||
disabled: checkMetricDisabled(globalMetrics, selectedMetric, cluster, h.subCluster),
|
||||
availability: checkMetricAvailability(globalMetrics, selectedMetric, cluster, h.subCluster),
|
||||
}))
|
||||
.sort((a, b) => a.host.localeCompare(b.host))
|
||||
}
|
||||
@@ -161,7 +162,7 @@
|
||||
</Badge>
|
||||
</span>
|
||||
</div>
|
||||
{#if item?.disabled}
|
||||
{#if item?.availability == "disabled"}
|
||||
<Card color="info">
|
||||
<CardHeader class="mb-0">
|
||||
<b>Disabled Metric</b>
|
||||
@@ -213,6 +214,18 @@
|
||||
</CardBody>
|
||||
</Card>
|
||||
</Row>
|
||||
{:else if notConfigured}
|
||||
<Row class="mx-1">
|
||||
<Card class="px-0" color="light">
|
||||
<CardHeader>
|
||||
<b>Metric not configured</b>
|
||||
</CardHeader>
|
||||
<CardBody>
|
||||
<p>No datasets returned for <b>{selectedMetric}</b>.</p>
|
||||
<p class="mb-1">Metric is not configured for cluster <b>{cluster}</b>.</p>
|
||||
</CardBody>
|
||||
</Card>
|
||||
</Row>
|
||||
{:else}
|
||||
<Row class="mx-1">
|
||||
<Card class="px-0" color="warning">
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
} from "@urql/svelte";
|
||||
import uPlot from "uplot";
|
||||
import { Card, CardBody, Spinner } from "@sveltestrap/sveltestrap";
|
||||
import { maxScope, checkMetricDisabled, scramble, scrambleNames } from "../../generic/utils.js";
|
||||
import { maxScope, checkMetricAvailability, scramble, scrambleNames } from "../../generic/utils.js";
|
||||
import MetricPlot from "../../generic/plots/MetricPlot.svelte";
|
||||
import NodeInfo from "./NodeInfo.svelte";
|
||||
|
||||
@@ -73,7 +73,7 @@
|
||||
|
||||
const extendedLegendData = $derived($nodeJobsData?.data ? buildExtendedLegend() : null);
|
||||
const refinedData = $derived(nodeData?.metrics ? sortAndSelectScope(selectedMetrics, nodeData.metrics) : []);
|
||||
const dataHealth = $derived(refinedData.filter((rd) => rd.disabled === false).map((enabled) => (enabled?.data?.metric?.series?.length > 0)));
|
||||
const dataHealth = $derived(refinedData.filter((rd) => rd.availability == "configured").map((enabled) => (enabled?.data?.metric?.series?.length > 0)));
|
||||
|
||||
/* Functions */
|
||||
function sortAndSelectScope(metricList = [], nodeMetrics = []) {
|
||||
@@ -81,7 +81,7 @@
|
||||
metricList.forEach((metricName) => {
|
||||
const pendingMetric = {
|
||||
name: metricName,
|
||||
disabled: checkMetricDisabled(
|
||||
availability: checkMetricAvailability(
|
||||
globalMetrics,
|
||||
metricName,
|
||||
cluster,
|
||||
@@ -130,23 +130,6 @@
|
||||
return pendingExtendedLegendData;
|
||||
}
|
||||
|
||||
/* Inspect */
|
||||
// $inspect(selectedMetrics).with((type, selectedMetrics) => {
|
||||
// console.log(type, 'selectedMetrics', selectedMetrics)
|
||||
// });
|
||||
|
||||
// $inspect(nodeData).with((type, nodeData) => {
|
||||
// console.log(type, 'nodeData', nodeData)
|
||||
// });
|
||||
|
||||
// $inspect(refinedData).with((type, refinedData) => {
|
||||
// console.log(type, 'refinedData', refinedData)
|
||||
// });
|
||||
|
||||
// $inspect(dataHealth).with((type, dataHealth) => {
|
||||
// console.log(type, 'dataHealth', dataHealth)
|
||||
// });
|
||||
|
||||
</script>
|
||||
|
||||
<tr>
|
||||
@@ -168,19 +151,25 @@
|
||||
{/if}
|
||||
</td>
|
||||
{#each refinedData as metricData, i (metricData?.data?.name || i)}
|
||||
<td>
|
||||
{#if metricData?.disabled}
|
||||
<Card body class="mx-2" color="info">
|
||||
<p>No dataset(s) returned for <b>{selectedMetrics[i]}</b></p>
|
||||
<p class="mb-1">Metric has been disabled for subcluster <b>{nodeData.subCluster}</b>.</p>
|
||||
</Card>
|
||||
{:else if !metricData?.data}
|
||||
<Card body class="mx-2" color="warning">
|
||||
<p>No dataset(s) returned for <b>{selectedMetrics[i]}</b></p>
|
||||
<p class="mb-1">Metric was not found in metric store for cluster <b>{cluster}</b>.</p>
|
||||
</Card>
|
||||
{:else if !!metricData.data?.metric.statisticsSeries}
|
||||
<!-- "No Data"-Warning included in MetricPlot-Component -->
|
||||
{#key metricData}
|
||||
<td>
|
||||
{#if metricData?.availability == "none"}
|
||||
<Card body class="mx-2" color="light">
|
||||
<p>No dataset(s) returned for <b>{selectedMetrics[i]}</b></p>
|
||||
<p class="mb-1">Metric is not configured for cluster <b>{cluster}</b>.</p>
|
||||
</Card>
|
||||
{:else if metricData?.availability == "disabled"}
|
||||
<Card body class="mx-2" color="info">
|
||||
<p>No dataset(s) returned for <b>{selectedMetrics[i]}</b></p>
|
||||
<p class="mb-1">Metric has been disabled for subcluster <b>{nodeData.subCluster}</b>.</p>
|
||||
</Card>
|
||||
{:else if !metricData?.data}
|
||||
<Card body class="mx-2" color="warning">
|
||||
<p>No dataset(s) returned for <b>{selectedMetrics[i]}</b></p>
|
||||
<p class="mb-1">Metric or host was not found in metric store for cluster <b>{cluster}</b>.</p>
|
||||
</Card>
|
||||
{:else if !!metricData.data?.metric.statisticsSeries}
|
||||
<!-- "No Data"-Warning included in MetricPlot-Component -->
|
||||
<MetricPlot
|
||||
{cluster}
|
||||
subCluster={nodeData.subCluster}
|
||||
@@ -194,8 +183,7 @@
|
||||
{plotSync}
|
||||
forNode
|
||||
/>
|
||||
<div class="my-2"></div>
|
||||
{#key extendedLegendData}
|
||||
<div class="my-2"></div>
|
||||
<MetricPlot
|
||||
{cluster}
|
||||
subCluster={nodeData.subCluster}
|
||||
@@ -208,8 +196,7 @@
|
||||
{plotSync}
|
||||
forNode
|
||||
/>
|
||||
{/key}
|
||||
{:else}
|
||||
{:else}
|
||||
<MetricPlot
|
||||
{cluster}
|
||||
subCluster={nodeData.subCluster}
|
||||
@@ -220,7 +207,8 @@
|
||||
height={375}
|
||||
forNode
|
||||
/>
|
||||
{/if}
|
||||
</td>
|
||||
{/if}
|
||||
</td>
|
||||
{/key}
|
||||
{/each}
|
||||
</tr>
|
||||
|
||||
Reference in New Issue
Block a user