diff --git a/CLAUDE.md b/CLAUDE.md index 2148fdca..a8d56571 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -108,6 +108,7 @@ The backend follows a layered architecture with clear separation of concerns: - File system backend (default) - S3 backend - SQLite backend (experimental) + - **parquet** sub-package: Parquet format support (schema, reader, writer, conversion) - **internal/metricstoreclient**: Client for cc-metric-store queries ### Frontend Structure diff --git a/README.md b/README.md index d01c7140..3306f838 100644 --- a/README.md +++ b/README.md @@ -184,7 +184,8 @@ ln -s ./var/job-archive - [`tools/`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools) Additional command line helper tools. - [`archive-manager`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools/archive-manager) - Commands for getting infos about an existing job archive. + Commands for getting infos about an existing job archive, importing jobs + between archive backends, and converting archives between JSON and Parquet formats. - [`archive-migration`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools/archive-migration) Tool for migrating job archives between formats. - [`convert-pem-pubkey`](https://github.com/ClusterCockpit/cc-backend/tree/master/tools/convert-pem-pubkey) diff --git a/init/clustercockpit.service b/init/clustercockpit.service index b4ed8bfa..3c977e34 100644 --- a/init/clustercockpit.service +++ b/init/clustercockpit.service @@ -12,7 +12,7 @@ NotifyAccess=all Restart=on-failure RestartSec=30 TimeoutStopSec=100 -ExecStart=/opt/monitoring/cc-backend/cc-backend --config ./config.json +ExecStart=/opt/monitoring/cc-backend/cc-backend --config ./config.json --server [Install] WantedBy=multi-user.target diff --git a/internal/api/log.go b/internal/api/log.go new file mode 100644 index 00000000..90add9bb --- /dev/null +++ b/internal/api/log.go @@ -0,0 +1,165 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package api + +import ( + "bufio" + "encoding/json" + "fmt" + "net/http" + "os/exec" + "regexp" + "strconv" + "strings" + + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/repository" + cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/schema" +) + +type LogEntry struct { + Timestamp string `json:"timestamp"` + Priority int `json:"priority"` + Message string `json:"message"` + Unit string `json:"unit"` +} + +var safePattern = regexp.MustCompile(`^[a-zA-Z0-9 :\-\.]+$`) + +func (api *RestAPI) getJournalLog(rw http.ResponseWriter, r *http.Request) { + user := repository.GetUserFromContext(r.Context()) + if !user.HasRole(schema.RoleAdmin) { + handleError(fmt.Errorf("only admins are allowed to view logs"), http.StatusForbidden, rw) + return + } + + since := r.URL.Query().Get("since") + if since == "" { + since = "1 hour ago" + } + if !safePattern.MatchString(since) { + handleError(fmt.Errorf("invalid 'since' parameter"), http.StatusBadRequest, rw) + return + } + + lines := 200 + if l := r.URL.Query().Get("lines"); l != "" { + n, err := strconv.Atoi(l) + if err != nil || n < 1 { + handleError(fmt.Errorf("invalid 'lines' parameter"), http.StatusBadRequest, rw) + return + } + if n > 1000 { + n = 1000 + } + lines = n + } + + unit := config.Keys.SystemdUnit + if unit == "" { + unit = "clustercockpit.service" + } + + args := []string{ + "--output=json", + "--no-pager", + "-n", fmt.Sprintf("%d", lines), + "--since", since, + "-u", unit, + } + + if level := r.URL.Query().Get("level"); level != "" { + n, err := strconv.Atoi(level) + if err != nil || n < 0 || n > 7 { + handleError(fmt.Errorf("invalid 'level' parameter (must be 0-7)"), http.StatusBadRequest, rw) + return + } + args = append(args, "--priority", fmt.Sprintf("%d", n)) + } + + if search := r.URL.Query().Get("search"); search != "" { + if !safePattern.MatchString(search) { + handleError(fmt.Errorf("invalid 'search' parameter"), http.StatusBadRequest, rw) + return + } + args = append(args, "--grep", search) + } + + cclog.Debugf("calling journalctl with %s", strings.Join(args, " ")) + cmd := exec.CommandContext(r.Context(), "journalctl", args...) + stdout, err := cmd.StdoutPipe() + if err != nil { + handleError(fmt.Errorf("failed to create pipe: %w", err), http.StatusInternalServerError, rw) + return + } + + if err := cmd.Start(); err != nil { + handleError(fmt.Errorf("failed to start journalctl: %w", err), http.StatusInternalServerError, rw) + return + } + + entries := make([]LogEntry, 0, lines) + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + var raw map[string]any + if err := json.Unmarshal(scanner.Bytes(), &raw); err != nil { + cclog.Debugf("error unmarshal log output: %v", err) + continue + } + + priority := 6 // default info + if p, ok := raw["PRIORITY"]; ok { + switch v := p.(type) { + case string: + if n, err := strconv.Atoi(v); err == nil { + priority = n + } + case float64: + priority = int(v) + } + } + + msg := "" + if m, ok := raw["MESSAGE"]; ok { + if s, ok := m.(string); ok { + msg = s + } + } + + ts := "" + if t, ok := raw["__REALTIME_TIMESTAMP"]; ok { + if s, ok := t.(string); ok { + ts = s + } + } + + unitName := "" + if u, ok := raw["_SYSTEMD_UNIT"]; ok { + if s, ok := u.(string); ok { + unitName = s + } + } + + entries = append(entries, LogEntry{ + Timestamp: ts, + Priority: priority, + Message: msg, + Unit: unitName, + }) + } + + if err := cmd.Wait(); err != nil { + // journalctl returns exit code 1 when --grep matches nothing + if len(entries) == 0 { + cclog.Debugf("journalctl exited with: %v", err) + } + } + + rw.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(rw).Encode(entries); err != nil { + cclog.Errorf("Failed to encode log entries: %v", err) + } +} diff --git a/internal/api/rest.go b/internal/api/rest.go index 575b1809..fe722511 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -158,6 +158,7 @@ func (api *RestAPI) MountConfigAPIRoutes(r chi.Router) { // MountFrontendAPIRoutes registers frontend-specific API endpoints. // These routes support JWT generation and user configuration updates with session authentication. func (api *RestAPI) MountFrontendAPIRoutes(r chi.Router) { + r.Get("/logs/", api.getJournalLog) // Settings Frontend Uses SessionAuth if api.Authentication != nil { r.Get("/jwt/", api.getJWT) diff --git a/internal/config/config.go b/internal/config/config.go index 2e601ed7..9de40695 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -72,14 +72,17 @@ type ProgramConfig struct { // If exists, will enable dynamic zoom in frontend metric plots using the configured values EnableResampling *ResampleConfig `json:"resampling"` + // Systemd unit name for log viewer (default: "clustercockpit") + SystemdUnit string `json:"systemd-unit"` + // Node state retention configuration NodeStateRetention *NodeStateRetention `json:"nodestate-retention"` } type NodeStateRetention struct { - Policy string `json:"policy"` // "delete" or "parquet" - Age int `json:"age"` // hours, default 24 - TargetKind string `json:"target-kind"` // "file" or "s3" + Policy string `json:"policy"` // "delete" or "parquet" + Age int `json:"age"` // hours, default 24 + TargetKind string `json:"target-kind"` // "file" or "s3" TargetPath string `json:"target-path"` TargetEndpoint string `json:"target-endpoint"` TargetBucket string `json:"target-bucket"` diff --git a/internal/repository/migrations/sqlite3/10_node-table.up.sql b/internal/repository/migrations/sqlite3/10_node-table.up.sql index fd118f5d..b788a8a9 100644 --- a/internal/repository/migrations/sqlite3/10_node-table.up.sql +++ b/internal/repository/migrations/sqlite3/10_node-table.up.sql @@ -38,6 +38,7 @@ CREATE INDEX IF NOT EXISTS nodestates_state_timestamp ON node_state (node_state, CREATE INDEX IF NOT EXISTS nodestates_health_timestamp ON node_state (health_state, time_stamp); CREATE INDEX IF NOT EXISTS nodestates_nodeid_state ON node_state (node_id, node_state); CREATE INDEX IF NOT EXISTS nodestates_nodeid_health ON node_state (node_id, health_state); +CREATE INDEX IF NOT EXISTS nodestates_nodeid_timestamp ON node_state (node_id, time_stamp DESC); -- Add NEW Indices For Increased Amounts of Tags CREATE INDEX IF NOT EXISTS tags_jobid ON jobtag (job_id); diff --git a/internal/repository/node.go b/internal/repository/node.go index 08a694c6..2ffe6698 100644 --- a/internal/repository/node.go +++ b/internal/repository/node.go @@ -52,6 +52,38 @@ func GetNodeRepository() *NodeRepository { return nodeRepoInstance } +// latestStateCondition returns a squirrel expression that restricts node_state +// rows to the latest per node_id using a correlated subquery. +// Requires the query to join node and node_state tables. +func latestStateCondition() sq.Sqlizer { + return sq.Expr( + "node_state.id = (SELECT ns2.id FROM node_state ns2 WHERE ns2.node_id = node.id ORDER BY ns2.time_stamp DESC LIMIT 1)", + ) +} + +// applyNodeFilters applies common NodeFilter conditions to a query that joins +// the node and node_state tables with latestStateCondition. +func applyNodeFilters(query sq.SelectBuilder, filters []*model.NodeFilter) sq.SelectBuilder { + for _, f := range filters { + if f.Cluster != nil { + query = buildStringCondition("node.cluster", f.Cluster, query) + } + if f.SubCluster != nil { + query = buildStringCondition("node.subcluster", f.SubCluster, query) + } + if f.Hostname != nil { + query = buildStringCondition("node.hostname", f.Hostname, query) + } + if f.SchedulerState != nil { + query = query.Where("node_state.node_state = ?", f.SchedulerState) + } + if f.HealthState != nil { + query = query.Where("node_state.health_state = ?", f.HealthState) + } + } + return query +} + func (r *NodeRepository) FetchMetadata(hostname string, cluster string) (map[string]string, error) { start := time.Now() @@ -82,17 +114,16 @@ func (r *NodeRepository) FetchMetadata(hostname string, cluster string) (map[str func (r *NodeRepository) GetNode(hostname string, cluster string, withMeta bool) (*schema.Node, error) { node := &schema.Node{} - var timestamp int - if err := sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state", - "node_state.health_state", "MAX(node_state.time_stamp) as time"). - From("node_state"). - Join("node ON node_state.node_id = node.id"). + if err := sq.Select("node.hostname", "node.cluster", "node.subcluster", + "node_state.node_state", "node_state.health_state"). + From("node"). + Join("node_state ON node_state.node_id = node.id"). + Where(latestStateCondition()). Where("node.hostname = ?", hostname). Where("node.cluster = ?", cluster). - GroupBy("node_state.node_id"). RunWith(r.DB). - QueryRow().Scan(&node.Hostname, &node.Cluster, &node.SubCluster, &node.NodeState, &node.HealthState, ×tamp); err != nil { - cclog.Warnf("Error while querying node '%s' at time '%d' from database: %v", hostname, timestamp, err) + QueryRow().Scan(&node.Hostname, &node.Cluster, &node.SubCluster, &node.NodeState, &node.HealthState); err != nil { + cclog.Warnf("Error while querying node '%s' from database: %v", hostname, err) return nil, err } @@ -111,16 +142,15 @@ func (r *NodeRepository) GetNode(hostname string, cluster string, withMeta bool) func (r *NodeRepository) GetNodeByID(id int64, withMeta bool) (*schema.Node, error) { node := &schema.Node{} - var timestamp int - if err := sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state", - "node_state.health_state", "MAX(node_state.time_stamp) as time"). - From("node_state"). - Join("node ON node_state.node_id = node.id"). + if err := sq.Select("node.hostname", "node.cluster", "node.subcluster", + "node_state.node_state", "node_state.health_state"). + From("node"). + Join("node_state ON node_state.node_id = node.id"). + Where(latestStateCondition()). Where("node.id = ?", id). - GroupBy("node_state.node_id"). RunWith(r.DB). - QueryRow().Scan(&node.Hostname, &node.Cluster, &node.SubCluster, &node.NodeState, &node.HealthState, ×tamp); err != nil { - cclog.Warnf("Error while querying node ID '%d' at time '%d' from database: %v", id, timestamp, err) + QueryRow().Scan(&node.Hostname, &node.Cluster, &node.SubCluster, &node.NodeState, &node.HealthState); err != nil { + cclog.Warnf("Error while querying node ID '%d' from database: %v", id, err) return nil, err } @@ -313,40 +343,17 @@ func (r *NodeRepository) QueryNodes( order *model.OrderByInput, // Currently unused! ) ([]*schema.Node, error) { query, qerr := AccessCheck(ctx, - sq.Select("hostname", "cluster", "subcluster", "node_state", "health_state", "MAX(time_stamp) as time"). + sq.Select("node.hostname", "node.cluster", "node.subcluster", + "node_state.node_state", "node_state.health_state"). From("node"). - Join("node_state ON node_state.node_id = node.id")) + Join("node_state ON node_state.node_id = node.id"). + Where(latestStateCondition())) if qerr != nil { return nil, qerr } - for _, f := range filters { - if f.Cluster != nil { - query = buildStringCondition("cluster", f.Cluster, query) - } - if f.SubCluster != nil { - query = buildStringCondition("subcluster", f.SubCluster, query) - } - if f.Hostname != nil { - query = buildStringCondition("hostname", f.Hostname, query) - } - if f.SchedulerState != nil { - query = query.Where("node_state = ?", f.SchedulerState) - // Requires Additional time_stamp Filter: Else the last (past!) time_stamp with queried state will be returned - // TODO: Hardcoded TimeDiff Suboptimal - Use Config Option? - now := time.Now().Unix() - query = query.Where(sq.Gt{"time_stamp": (now - 300)}) - } - if f.HealthState != nil { - query = query.Where("health_state = ?", f.HealthState) - // Requires Additional time_stamp Filter: Else the last (past!) time_stamp with queried state will be returned - // TODO: Hardcoded TimeDiff Suboptimal - Use Config Option? - now := time.Now().Unix() - query = query.Where(sq.Gt{"time_stamp": (now - 300)}) - } - } - - query = query.GroupBy("node_id").OrderBy("hostname ASC") + query = applyNodeFilters(query, filters) + query = query.OrderBy("node.hostname ASC") if page != nil && page.ItemsPerPage != -1 { limit := uint64(page.ItemsPerPage) @@ -363,11 +370,10 @@ func (r *NodeRepository) QueryNodes( nodes := make([]*schema.Node, 0) for rows.Next() { node := schema.Node{} - var timestamp int if err := rows.Scan(&node.Hostname, &node.Cluster, &node.SubCluster, - &node.NodeState, &node.HealthState, ×tamp); err != nil { + &node.NodeState, &node.HealthState); err != nil { rows.Close() - cclog.Warnf("Error while scanning rows (QueryNodes) at time '%d'", timestamp) + cclog.Warn("Error while scanning rows (QueryNodes)") return nil, err } nodes = append(nodes, &node) @@ -377,74 +383,39 @@ func (r *NodeRepository) QueryNodes( } // CountNodes returns the total matched nodes based on a node filter. It always operates -// on the last state (largest timestamp). +// on the last state (largest timestamp) per node. func (r *NodeRepository) CountNodes( ctx context.Context, filters []*model.NodeFilter, ) (int, error) { query, qerr := AccessCheck(ctx, - sq.Select("time_stamp", "count(*) as countRes"). + sq.Select("COUNT(*)"). From("node"). - Join("node_state ON node_state.node_id = node.id")) + Join("node_state ON node_state.node_id = node.id"). + Where(latestStateCondition())) if qerr != nil { return 0, qerr } - for _, f := range filters { - if f.Cluster != nil { - query = buildStringCondition("cluster", f.Cluster, query) - } - if f.SubCluster != nil { - query = buildStringCondition("subcluster", f.SubCluster, query) - } - if f.Hostname != nil { - query = buildStringCondition("hostname", f.Hostname, query) - } - if f.SchedulerState != nil { - query = query.Where("node_state = ?", f.SchedulerState) - // Requires Additional time_stamp Filter: Else the last (past!) time_stamp with queried state will be returned - // TODO: Hardcoded TimeDiff Suboptimal - Use Config Option? - now := time.Now().Unix() - query = query.Where(sq.Gt{"time_stamp": (now - 300)}) - } - if f.HealthState != nil { - query = query.Where("health_state = ?", f.HealthState) - // Requires Additional time_stamp Filter: Else the last (past!) time_stamp with queried state will be returned - // TODO: Hardcoded TimeDiff Suboptimal - Use Config Option? - now := time.Now().Unix() - query = query.Where(sq.Gt{"time_stamp": (now - 300)}) - } - } + query = applyNodeFilters(query, filters) - query = query.GroupBy("time_stamp").OrderBy("time_stamp DESC").Limit(1) - - rows, err := query.RunWith(r.stmtCache).Query() - if err != nil { + var count int + if err := query.RunWith(r.stmtCache).QueryRow().Scan(&count); err != nil { queryString, queryVars, _ := query.ToSql() cclog.Errorf("Error while running query '%s' %v: %v", queryString, queryVars, err) return 0, err } - var totalNodes int - for rows.Next() { - var timestamp int - if err := rows.Scan(×tamp, &totalNodes); err != nil { - rows.Close() - cclog.Warnf("Error while scanning rows (CountNodes) at time '%d'", timestamp) - return 0, err - } - } - - return totalNodes, nil + return count, nil } func (r *NodeRepository) ListNodes(cluster string) ([]*schema.Node, error) { - q := sq.Select("node.hostname", "node.cluster", "node.subcluster", "node_state.node_state", - "node_state.health_state", "MAX(node_state.time_stamp) as time"). + q := sq.Select("node.hostname", "node.cluster", "node.subcluster", + "node_state.node_state", "node_state.health_state"). From("node"). Join("node_state ON node_state.node_id = node.id"). + Where(latestStateCondition()). Where("node.cluster = ?", cluster). - GroupBy("node_state.node_id"). OrderBy("node.hostname ASC") rows, err := q.RunWith(r.DB).Query() @@ -456,10 +427,9 @@ func (r *NodeRepository) ListNodes(cluster string) ([]*schema.Node, error) { defer rows.Close() for rows.Next() { node := &schema.Node{} - var timestamp int if err := rows.Scan(&node.Hostname, &node.Cluster, - &node.SubCluster, &node.NodeState, &node.HealthState, ×tamp); err != nil { - cclog.Warnf("Error while scanning node list (ListNodes) at time '%d'", timestamp) + &node.SubCluster, &node.NodeState, &node.HealthState); err != nil { + cclog.Warn("Error while scanning node list (ListNodes)") return nil, err } @@ -470,11 +440,11 @@ func (r *NodeRepository) ListNodes(cluster string) ([]*schema.Node, error) { } func (r *NodeRepository) MapNodes(cluster string) (map[string]string, error) { - q := sq.Select("node.hostname", "node_state.node_state", "MAX(node_state.time_stamp) as time"). + q := sq.Select("node.hostname", "node_state.node_state"). From("node"). Join("node_state ON node_state.node_id = node.id"). + Where(latestStateCondition()). Where("node.cluster = ?", cluster). - GroupBy("node_state.node_id"). OrderBy("node.hostname ASC") rows, err := q.RunWith(r.DB).Query() @@ -487,9 +457,8 @@ func (r *NodeRepository) MapNodes(cluster string) (map[string]string, error) { defer rows.Close() for rows.Next() { var hostname, nodestate string - var timestamp int - if err := rows.Scan(&hostname, &nodestate, ×tamp); err != nil { - cclog.Warnf("Error while scanning node list (MapNodes) at time '%d'", timestamp) + if err := rows.Scan(&hostname, &nodestate); err != nil { + cclog.Warn("Error while scanning node list (MapNodes)") return nil, err } @@ -500,33 +469,16 @@ func (r *NodeRepository) MapNodes(cluster string) (map[string]string, error) { } func (r *NodeRepository) CountStates(ctx context.Context, filters []*model.NodeFilter, column string) ([]*model.NodeStates, error) { - query, qerr := AccessCheck(ctx, sq.Select("hostname", column, "MAX(time_stamp) as time").From("node")) + query, qerr := AccessCheck(ctx, + sq.Select(column). + From("node"). + Join("node_state ON node_state.node_id = node.id"). + Where(latestStateCondition())) if qerr != nil { return nil, qerr } - query = query.Join("node_state ON node_state.node_id = node.id") - - for _, f := range filters { - if f.Hostname != nil { - query = buildStringCondition("hostname", f.Hostname, query) - } - if f.Cluster != nil { - query = buildStringCondition("cluster", f.Cluster, query) - } - if f.SubCluster != nil { - query = buildStringCondition("subcluster", f.SubCluster, query) - } - if f.SchedulerState != nil { - query = query.Where("node_state = ?", f.SchedulerState) - } - if f.HealthState != nil { - query = query.Where("health_state = ?", f.HealthState) - } - } - - // Add Group and Order - query = query.GroupBy("hostname").OrderBy("hostname DESC") + query = applyNodeFilters(query, filters) rows, err := query.RunWith(r.stmtCache).Query() if err != nil { @@ -537,12 +489,10 @@ func (r *NodeRepository) CountStates(ctx context.Context, filters []*model.NodeF stateMap := map[string]int{} for rows.Next() { - var hostname, state string - var timestamp int - - if err := rows.Scan(&hostname, &state, ×tamp); err != nil { + var state string + if err := rows.Scan(&state); err != nil { rows.Close() - cclog.Warnf("Error while scanning rows (CountStates) at time '%d'", timestamp) + cclog.Warn("Error while scanning rows (CountStates)") return nil, err } @@ -735,26 +685,14 @@ func (r *NodeRepository) GetNodesForList( } } else { - // DB Nodes: Count and Find Next Page + // DB Nodes: Count and derive hasNextPage from count var cerr error countNodes, cerr = r.CountNodes(ctx, queryFilters) if cerr != nil { cclog.Warn("error while counting node database data (Resolver.NodeMetricsList)") return nil, nil, 0, false, cerr } - - // Example Page 4 @ 10 IpP : Does item 41 exist? - // Minimal Page 41 @ 1 IpP : If len(result) is 1, Page 5 exists. - nextPage := &model.PageRequest{ - ItemsPerPage: 1, - Page: ((page.Page * page.ItemsPerPage) + 1), - } - nextNodes, err := r.QueryNodes(ctx, queryFilters, nextPage, nil) // Order not Used - if err != nil { - cclog.Warn("Error while querying next nodes") - return nil, nil, 0, false, err - } - hasNextPage = len(nextNodes) == 1 + hasNextPage = page.Page*page.ItemsPerPage < countNodes } // Fallback for non-init'd node table in DB; Ignores stateFilter diff --git a/internal/routerConfig/routes.go b/internal/routerConfig/routes.go index e3978ddc..46d1ea5b 100644 --- a/internal/routerConfig/routes.go +++ b/internal/routerConfig/routes.go @@ -50,6 +50,7 @@ var routes []Route = []Route{ {"/monitoring/status/{cluster}", "monitoring/status.tmpl", " Dashboard - ClusterCockpit", false, setupClusterStatusRoute}, {"/monitoring/status/detail/{cluster}", "monitoring/status.tmpl", "Status of - ClusterCockpit", false, setupClusterDetailRoute}, {"/monitoring/dashboard/{cluster}", "monitoring/dashboard.tmpl", " Dashboard - ClusterCockpit", false, setupDashboardRoute}, + {"/monitoring/logs", "monitoring/logs.tmpl", "Logs - ClusterCockpit", false, func(i InfoType, r *http.Request) InfoType { return i }}, } func setupHomeRoute(i InfoType, r *http.Request) InfoType { diff --git a/internal/taskmanager/retentionService.go b/internal/taskmanager/retentionService.go index 453d10bc..d863bb91 100644 --- a/internal/taskmanager/retentionService.go +++ b/internal/taskmanager/retentionService.go @@ -6,157 +6,329 @@ package taskmanager import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" "time" "github.com/ClusterCockpit/cc-backend/pkg/archive" pqarchive "github.com/ClusterCockpit/cc-backend/pkg/archive/parquet" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/schema" "github.com/go-co-op/gocron/v2" ) -func RegisterRetentionDeleteService(age int, includeDB bool, omitTagged bool) { +// createParquetTarget creates a ParquetTarget (file or S3) from the retention config. +func createParquetTarget(cfg Retention) (pqarchive.ParquetTarget, error) { + switch cfg.TargetKind { + case "s3": + return pqarchive.NewS3Target(pqarchive.S3TargetConfig{ + Endpoint: cfg.TargetEndpoint, + Bucket: cfg.TargetBucket, + AccessKey: cfg.TargetAccessKey, + SecretKey: cfg.TargetSecretKey, + Region: cfg.TargetRegion, + UsePathStyle: cfg.TargetUsePathStyle, + }) + default: + return pqarchive.NewFileTarget(cfg.TargetPath) + } +} + +// createTargetBackend creates a secondary archive backend (file or S3) for JSON copy/move. +func createTargetBackend(cfg Retention) (archive.ArchiveBackend, error) { + var raw json.RawMessage + var err error + + switch cfg.TargetKind { + case "s3": + raw, err = json.Marshal(map[string]interface{}{ + "kind": "s3", + "endpoint": cfg.TargetEndpoint, + "bucket": cfg.TargetBucket, + "access-key": cfg.TargetAccessKey, + "secret-key": cfg.TargetSecretKey, + "region": cfg.TargetRegion, + "use-path-style": cfg.TargetUsePathStyle, + }) + default: + raw, err = json.Marshal(map[string]string{ + "kind": "file", + "path": cfg.TargetPath, + }) + } + if err != nil { + return nil, fmt.Errorf("marshal target config: %w", err) + } + return archive.InitBackend(raw) +} + +// transferJobsJSON copies job data from source archive to target backend in JSON format. +func transferJobsJSON(jobs []*schema.Job, src archive.ArchiveBackend, dst archive.ArchiveBackend) error { + // Transfer cluster configs for all clusters referenced by jobs + clustersDone := make(map[string]bool) + for _, job := range jobs { + if clustersDone[job.Cluster] { + continue + } + clusterCfg, err := src.LoadClusterCfg(job.Cluster) + if err != nil { + cclog.Warnf("Retention: load cluster config %q: %v", job.Cluster, err) + } else { + if err := dst.StoreClusterCfg(job.Cluster, clusterCfg); err != nil { + cclog.Warnf("Retention: store cluster config %q: %v", job.Cluster, err) + } + } + clustersDone[job.Cluster] = true + } + + for _, job := range jobs { + meta, err := src.LoadJobMeta(job) + if err != nil { + cclog.Warnf("Retention: load meta for job %d: %v", job.JobID, err) + continue + } + data, err := src.LoadJobData(job) + if err != nil { + cclog.Warnf("Retention: load data for job %d: %v", job.JobID, err) + continue + } + if err := dst.ImportJob(meta, &data); err != nil { + cclog.Warnf("Retention: import job %d: %v", job.JobID, err) + continue + } + } + return nil +} + +// transferJobsParquet converts jobs to Parquet format, organized by cluster. +func transferJobsParquet(jobs []*schema.Job, src archive.ArchiveBackend, target pqarchive.ParquetTarget, maxSizeMB int) error { + cw := pqarchive.NewClusterAwareParquetWriter(target, maxSizeMB) + + // Set cluster configs for all clusters referenced by jobs + clustersDone := make(map[string]bool) + for _, job := range jobs { + if clustersDone[job.Cluster] { + continue + } + clusterCfg, err := src.LoadClusterCfg(job.Cluster) + if err != nil { + cclog.Warnf("Retention: load cluster config %q: %v", job.Cluster, err) + } else { + cw.SetClusterConfig(job.Cluster, clusterCfg) + } + clustersDone[job.Cluster] = true + } + + for _, job := range jobs { + meta, err := src.LoadJobMeta(job) + if err != nil { + cclog.Warnf("Retention: load meta for job %d: %v", job.JobID, err) + continue + } + data, err := src.LoadJobData(job) + if err != nil { + cclog.Warnf("Retention: load data for job %d: %v", job.JobID, err) + continue + } + row, err := pqarchive.JobToParquetRow(meta, &data) + if err != nil { + cclog.Warnf("Retention: convert job %d: %v", job.JobID, err) + continue + } + if err := cw.AddJob(*row); err != nil { + cclog.Errorf("Retention: add job %d to writer: %v", job.JobID, err) + continue + } + } + + return cw.Close() +} + +// cleanupAfterTransfer removes jobs from archive and optionally from DB. +func cleanupAfterTransfer(jobs []*schema.Job, startTime int64, includeDB bool, omitTagged bool) { + archive.GetHandle().CleanUp(jobs) + + if includeDB { + cnt, err := jobRepo.DeleteJobsBefore(startTime, omitTagged) + if err != nil { + cclog.Errorf("Retention: delete jobs from db: %v", err) + } else { + cclog.Infof("Retention: removed %d jobs from db", cnt) + } + if err = jobRepo.Optimize(); err != nil { + cclog.Errorf("Retention: db optimization error: %v", err) + } + } +} + +// readCopyMarker reads the last-processed timestamp from a copy marker file. +func readCopyMarker(cfg Retention) int64 { + var data []byte + var err error + + switch cfg.TargetKind { + case "s3": + // For S3 we store the marker locally alongside the config + data, err = os.ReadFile(copyMarkerPath(cfg)) + default: + data, err = os.ReadFile(filepath.Join(cfg.TargetPath, ".copy-marker")) + } + if err != nil { + return 0 + } + ts, err := strconv.ParseInt(strings.TrimSpace(string(data)), 10, 64) + if err != nil { + return 0 + } + return ts +} + +// writeCopyMarker writes the last-processed timestamp to a copy marker file. +func writeCopyMarker(cfg Retention, ts int64) { + content := []byte(strconv.FormatInt(ts, 10)) + var err error + + switch cfg.TargetKind { + case "s3": + err = os.WriteFile(copyMarkerPath(cfg), content, 0o640) + default: + err = os.WriteFile(filepath.Join(cfg.TargetPath, ".copy-marker"), content, 0o640) + } + if err != nil { + cclog.Warnf("Retention: write copy marker: %v", err) + } +} + +func copyMarkerPath(cfg Retention) string { + // For S3 targets, store the marker in a local temp-style path derived from the bucket name + return filepath.Join(os.TempDir(), fmt.Sprintf("cc-copy-marker-%s", cfg.TargetBucket)) +} + +func RegisterRetentionDeleteService(cfg Retention) { cclog.Info("Register retention delete service") s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(3, 0, 0))), gocron.NewTask( func() { - startTime := time.Now().Unix() - int64(age*24*3600) - jobs, err := jobRepo.FindJobsBetween(0, startTime, omitTagged) + startTime := time.Now().Unix() - int64(cfg.Age*24*3600) + jobs, err := jobRepo.FindJobsBetween(0, startTime, cfg.OmitTagged) if err != nil { - cclog.Warnf("Error while looking for retention jobs: %s", err.Error()) - } - archive.GetHandle().CleanUp(jobs) - - if includeDB { - cnt, err := jobRepo.DeleteJobsBefore(startTime, omitTagged) - if err != nil { - cclog.Errorf("Error while deleting retention jobs from db: %s", err.Error()) - } else { - cclog.Infof("Retention: Removed %d jobs from db", cnt) - } - if err = jobRepo.Optimize(); err != nil { - cclog.Errorf("Error occured in db optimization: %s", err.Error()) - } - } - })) -} - -func RegisterRetentionMoveService(age int, includeDB bool, location string, omitTagged bool) { - cclog.Info("Register retention move service") - - s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(4, 0, 0))), - gocron.NewTask( - func() { - startTime := time.Now().Unix() - int64(age*24*3600) - jobs, err := jobRepo.FindJobsBetween(0, startTime, omitTagged) - if err != nil { - cclog.Warnf("Error while looking for retention jobs: %s", err.Error()) - } - archive.GetHandle().Move(jobs, location) - - if includeDB { - cnt, err := jobRepo.DeleteJobsBefore(startTime, omitTagged) - if err != nil { - cclog.Errorf("Error while deleting retention jobs from db: %v", err) - } else { - cclog.Infof("Retention: Removed %d jobs from db", cnt) - } - if err = jobRepo.Optimize(); err != nil { - cclog.Errorf("Error occured in db optimization: %v", err) - } - } - })) -} - -func RegisterRetentionParquetService(retention Retention) { - cclog.Info("Register retention parquet service") - - maxFileSizeMB := retention.MaxFileSizeMB - if maxFileSizeMB <= 0 { - maxFileSizeMB = 512 - } - - var target pqarchive.ParquetTarget - var err error - - switch retention.TargetKind { - case "s3": - target, err = pqarchive.NewS3Target(pqarchive.S3TargetConfig{ - Endpoint: retention.TargetEndpoint, - Bucket: retention.TargetBucket, - AccessKey: retention.TargetAccessKey, - SecretKey: retention.TargetSecretKey, - Region: retention.TargetRegion, - UsePathStyle: retention.TargetUsePathStyle, - }) - default: - target, err = pqarchive.NewFileTarget(retention.TargetPath) - } - - if err != nil { - cclog.Errorf("Parquet retention: failed to create target: %v", err) - return - } - - s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(5, 0, 0))), - gocron.NewTask( - func() { - startTime := time.Now().Unix() - int64(retention.Age*24*3600) - jobs, err := jobRepo.FindJobsBetween(0, startTime, retention.OmitTagged) - if err != nil { - cclog.Warnf("Parquet retention: error finding jobs: %v", err) + cclog.Warnf("Retention delete: error finding jobs: %v", err) return } if len(jobs) == 0 { return } - cclog.Infof("Parquet retention: processing %d jobs", len(jobs)) - ar := archive.GetHandle() - pw := pqarchive.NewParquetWriter(target, maxFileSizeMB) + cclog.Infof("Retention delete: processing %d jobs", len(jobs)) + cleanupAfterTransfer(jobs, startTime, cfg.IncludeDB, cfg.OmitTagged) + })) +} - for _, job := range jobs { - meta, err := ar.LoadJobMeta(job) - if err != nil { - cclog.Warnf("Parquet retention: load meta for job %d: %v", job.JobID, err) - continue - } +func RegisterRetentionCopyService(cfg Retention) { + cclog.Infof("Register retention copy service (format=%s, target=%s)", cfg.Format, cfg.TargetKind) - data, err := ar.LoadJobData(job) - if err != nil { - cclog.Warnf("Parquet retention: load data for job %d: %v", job.JobID, err) - continue - } + maxFileSizeMB := cfg.MaxFileSizeMB + if maxFileSizeMB <= 0 { + maxFileSizeMB = 512 + } - row, err := pqarchive.JobToParquetRow(meta, &data) - if err != nil { - cclog.Warnf("Parquet retention: convert job %d: %v", job.JobID, err) - continue - } + s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(4, 0, 0))), + gocron.NewTask( + func() { + cutoff := time.Now().Unix() - int64(cfg.Age*24*3600) + lastProcessed := readCopyMarker(cfg) - if err := pw.AddJob(*row); err != nil { - cclog.Errorf("Parquet retention: add job %d to writer: %v", job.JobID, err) - continue - } + jobs, err := jobRepo.FindJobsBetween(lastProcessed, cutoff, cfg.OmitTagged) + if err != nil { + cclog.Warnf("Retention copy: error finding jobs: %v", err) + return } - - if err := pw.Close(); err != nil { - cclog.Errorf("Parquet retention: close writer: %v", err) + if len(jobs) == 0 { return } - ar.CleanUp(jobs) + cclog.Infof("Retention copy: processing %d jobs", len(jobs)) + ar := archive.GetHandle() - if retention.IncludeDB { - cnt, err := jobRepo.DeleteJobsBefore(startTime, retention.OmitTagged) + switch cfg.Format { + case "parquet": + target, err := createParquetTarget(cfg) if err != nil { - cclog.Errorf("Parquet retention: delete jobs from db: %v", err) - } else { - cclog.Infof("Parquet retention: removed %d jobs from db", cnt) + cclog.Errorf("Retention copy: create parquet target: %v", err) + return } - if err = jobRepo.Optimize(); err != nil { - cclog.Errorf("Parquet retention: db optimization error: %v", err) + if err := transferJobsParquet(jobs, ar, target, maxFileSizeMB); err != nil { + cclog.Errorf("Retention copy: parquet transfer: %v", err) + return + } + default: // json + dst, err := createTargetBackend(cfg) + if err != nil { + cclog.Errorf("Retention copy: create target backend: %v", err) + return + } + if err := transferJobsJSON(jobs, ar, dst); err != nil { + cclog.Errorf("Retention copy: json transfer: %v", err) + return } } + + writeCopyMarker(cfg, cutoff) + })) +} + +func RegisterRetentionMoveService(cfg Retention) { + cclog.Infof("Register retention move service (format=%s, target=%s)", cfg.Format, cfg.TargetKind) + + maxFileSizeMB := cfg.MaxFileSizeMB + if maxFileSizeMB <= 0 { + maxFileSizeMB = 512 + } + + s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(5, 0, 0))), + gocron.NewTask( + func() { + startTime := time.Now().Unix() - int64(cfg.Age*24*3600) + jobs, err := jobRepo.FindJobsBetween(0, startTime, cfg.OmitTagged) + if err != nil { + cclog.Warnf("Retention move: error finding jobs: %v", err) + return + } + if len(jobs) == 0 { + return + } + + cclog.Infof("Retention move: processing %d jobs", len(jobs)) + ar := archive.GetHandle() + + switch cfg.Format { + case "parquet": + target, err := createParquetTarget(cfg) + if err != nil { + cclog.Errorf("Retention move: create parquet target: %v", err) + return + } + if err := transferJobsParquet(jobs, ar, target, maxFileSizeMB); err != nil { + cclog.Errorf("Retention move: parquet transfer: %v", err) + return + } + default: // json + dst, err := createTargetBackend(cfg) + if err != nil { + cclog.Errorf("Retention move: create target backend: %v", err) + return + } + if err := transferJobsJSON(jobs, ar, dst); err != nil { + cclog.Errorf("Retention move: json transfer: %v", err) + return + } + } + + cleanupAfterTransfer(jobs, startTime, cfg.IncludeDB, cfg.OmitTagged) })) } diff --git a/internal/taskmanager/taskManager.go b/internal/taskmanager/taskManager.go index 8cf6b4e6..529395b5 100644 --- a/internal/taskmanager/taskManager.go +++ b/internal/taskmanager/taskManager.go @@ -23,20 +23,20 @@ const ( // Retention defines the configuration for job retention policies. type Retention struct { - Policy string `json:"policy"` - Location string `json:"location"` - Age int `json:"age"` - IncludeDB bool `json:"includeDB"` - OmitTagged bool `json:"omitTagged"` - TargetKind string `json:"target-kind"` - TargetPath string `json:"target-path"` - TargetEndpoint string `json:"target-endpoint"` - TargetBucket string `json:"target-bucket"` - TargetAccessKey string `json:"target-access-key"` - TargetSecretKey string `json:"target-secret-key"` - TargetRegion string `json:"target-region"` - TargetUsePathStyle bool `json:"target-use-path-style"` - MaxFileSizeMB int `json:"max-file-size-mb"` + Policy string `json:"policy"` + Format string `json:"format"` + Age int `json:"age"` + IncludeDB bool `json:"includeDB"` + OmitTagged bool `json:"omitTagged"` + TargetKind string `json:"target-kind"` + TargetPath string `json:"target-path"` + TargetEndpoint string `json:"target-endpoint"` + TargetBucket string `json:"target-bucket"` + TargetAccessKey string `json:"target-access-key"` + TargetSecretKey string `json:"target-secret-key"` + TargetRegion string `json:"target-region"` + TargetUsePathStyle bool `json:"target-use-path-style"` + MaxFileSizeMB int `json:"max-file-size-mb"` } // CronFrequency defines the execution intervals for various background workers. @@ -86,18 +86,11 @@ func initArchiveServices(config json.RawMessage) { switch cfg.Retention.Policy { case "delete": - RegisterRetentionDeleteService( - cfg.Retention.Age, - cfg.Retention.IncludeDB, - cfg.Retention.OmitTagged) + RegisterRetentionDeleteService(cfg.Retention) + case "copy": + RegisterRetentionCopyService(cfg.Retention) case "move": - RegisterRetentionMoveService( - cfg.Retention.Age, - cfg.Retention.IncludeDB, - cfg.Retention.Location, - cfg.Retention.OmitTagged) - case "parquet": - RegisterRetentionParquetService(cfg.Retention) + RegisterRetentionMoveService(cfg.Retention) } if cfg.Compression > 0 { diff --git a/pkg/archive/ConfigSchema.go b/pkg/archive/ConfigSchema.go index db568200..cb9b16bc 100644 --- a/pkg/archive/ConfigSchema.go +++ b/pkg/archive/ConfigSchema.go @@ -57,7 +57,12 @@ var configSchema = ` "policy": { "description": "Retention policy", "type": "string", - "enum": ["none", "delete", "move", "parquet"] + "enum": ["none", "delete", "copy", "move"] + }, + "format": { + "description": "Output format for copy/move policies", + "type": "string", + "enum": ["json", "parquet"] }, "include-db": { "description": "Also remove jobs from database", @@ -67,41 +72,37 @@ var configSchema = ` "description": "Act on jobs with startTime older than age (in days)", "type": "integer" }, - "location": { - "description": "The target directory for retention. Only applicable for retention move.", - "type": "string" - }, "target-kind": { - "description": "Target storage kind for parquet retention: file or s3", + "description": "Target storage kind: file or s3", "type": "string", "enum": ["file", "s3"] }, "target-path": { - "description": "Target directory path for parquet file storage", + "description": "Target directory path for file storage", "type": "string" }, "target-endpoint": { - "description": "S3 endpoint URL for parquet target", + "description": "S3 endpoint URL for target", "type": "string" }, "target-bucket": { - "description": "S3 bucket name for parquet target", + "description": "S3 bucket name for target", "type": "string" }, "target-access-key": { - "description": "S3 access key for parquet target", + "description": "S3 access key for target", "type": "string" }, "target-secret-key": { - "description": "S3 secret key for parquet target", + "description": "S3 secret key for target", "type": "string" }, "target-region": { - "description": "S3 region for parquet target", + "description": "S3 region for target", "type": "string" }, "target-use-path-style": { - "description": "Use path-style S3 URLs for parquet target", + "description": "Use path-style S3 URLs for target", "type": "boolean" }, "max-file-size-mb": { diff --git a/pkg/archive/parquet/convert.go b/pkg/archive/parquet/convert.go index ba1e76eb..43e611e4 100644 --- a/pkg/archive/parquet/convert.go +++ b/pkg/archive/parquet/convert.go @@ -93,6 +93,91 @@ func JobToParquetRow(meta *schema.Job, data *schema.JobData) (*ParquetJobRow, er }, nil } +// ParquetRowToJob converts a ParquetJobRow back into job metadata and metric data. +// This is the reverse of JobToParquetRow. +func ParquetRowToJob(row *ParquetJobRow) (*schema.Job, *schema.JobData, error) { + meta := &schema.Job{ + JobID: row.JobID, + Cluster: row.Cluster, + SubCluster: row.SubCluster, + Partition: row.Partition, + Project: row.Project, + User: row.User, + State: schema.JobState(row.State), + StartTime: row.StartTime, + Duration: row.Duration, + Walltime: row.Walltime, + NumNodes: row.NumNodes, + NumHWThreads: row.NumHWThreads, + NumAcc: row.NumAcc, + Energy: row.Energy, + SMT: row.SMT, + } + + if len(row.ResourcesJSON) > 0 { + if err := json.Unmarshal(row.ResourcesJSON, &meta.Resources); err != nil { + return nil, nil, fmt.Errorf("unmarshal resources: %w", err) + } + } + + if len(row.StatisticsJSON) > 0 { + if err := json.Unmarshal(row.StatisticsJSON, &meta.Statistics); err != nil { + return nil, nil, fmt.Errorf("unmarshal statistics: %w", err) + } + } + + if len(row.TagsJSON) > 0 { + if err := json.Unmarshal(row.TagsJSON, &meta.Tags); err != nil { + return nil, nil, fmt.Errorf("unmarshal tags: %w", err) + } + } + + if len(row.MetaDataJSON) > 0 { + if err := json.Unmarshal(row.MetaDataJSON, &meta.MetaData); err != nil { + return nil, nil, fmt.Errorf("unmarshal metadata: %w", err) + } + } + + if len(row.FootprintJSON) > 0 { + if err := json.Unmarshal(row.FootprintJSON, &meta.Footprint); err != nil { + return nil, nil, fmt.Errorf("unmarshal footprint: %w", err) + } + } + + if len(row.EnergyFootJSON) > 0 { + if err := json.Unmarshal(row.EnergyFootJSON, &meta.EnergyFootprint); err != nil { + return nil, nil, fmt.Errorf("unmarshal energy footprint: %w", err) + } + } + + data, err := decompressJobData(row.MetricDataGz) + if err != nil { + return nil, nil, fmt.Errorf("decompress metric data: %w", err) + } + + return meta, data, nil +} + +func decompressJobData(data []byte) (*schema.JobData, error) { + gz, err := gzip.NewReader(bytes.NewReader(data)) + if err != nil { + return nil, err + } + defer gz.Close() + + var buf bytes.Buffer + if _, err := buf.ReadFrom(gz); err != nil { + return nil, err + } + + var jobData schema.JobData + if err := json.Unmarshal(buf.Bytes(), &jobData); err != nil { + return nil, err + } + + return &jobData, nil +} + func compressJobData(data *schema.JobData) ([]byte, error) { jsonBytes, err := json.Marshal(data) if err != nil { diff --git a/pkg/archive/parquet/convert_test.go b/pkg/archive/parquet/convert_test.go new file mode 100644 index 00000000..3b2848ba --- /dev/null +++ b/pkg/archive/parquet/convert_test.go @@ -0,0 +1,305 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package parquet + +import ( + "testing" + + "github.com/ClusterCockpit/cc-lib/v2/schema" +) + +func TestParquetRowToJob(t *testing.T) { + meta := &schema.Job{ + JobID: 42, + Cluster: "testcluster", + SubCluster: "sc0", + Partition: "main", + Project: "testproject", + User: "testuser", + State: schema.JobStateCompleted, + StartTime: 1700000000, + Duration: 3600, + Walltime: 7200, + NumNodes: 2, + NumHWThreads: 16, + NumAcc: 4, + Energy: 123.45, + SMT: 2, + Resources: []*schema.Resource{ + {Hostname: "node001", HWThreads: []int{0, 1, 2, 3}}, + {Hostname: "node002", HWThreads: []int{4, 5, 6, 7}}, + }, + Statistics: map[string]schema.JobStatistics{ + "cpu_load": {Avg: 50.0, Min: 10.0, Max: 90.0}, + }, + Tags: []*schema.Tag{ + {Type: "test", Name: "tag1"}, + }, + MetaData: map[string]string{ + "key1": "value1", + }, + Footprint: map[string]float64{ + "cpu_load": 50.0, + }, + EnergyFootprint: map[string]float64{ + "total": 123.45, + }, + } + + data := &schema.JobData{ + "cpu_load": { + schema.MetricScopeNode: &schema.JobMetric{ + Unit: schema.Unit{Base: ""}, + Timestep: 60, + Series: []schema.Series{ + { + Hostname: "node001", + Data: []schema.Float{1.0, 2.0, 3.0}, + }, + }, + }, + }, + } + + // Convert to parquet row + row, err := JobToParquetRow(meta, data) + if err != nil { + t.Fatalf("JobToParquetRow: %v", err) + } + + // Convert back + gotMeta, gotData, err := ParquetRowToJob(row) + if err != nil { + t.Fatalf("ParquetRowToJob: %v", err) + } + + // Verify scalar fields + if gotMeta.JobID != meta.JobID { + t.Errorf("JobID = %d, want %d", gotMeta.JobID, meta.JobID) + } + if gotMeta.Cluster != meta.Cluster { + t.Errorf("Cluster = %q, want %q", gotMeta.Cluster, meta.Cluster) + } + if gotMeta.SubCluster != meta.SubCluster { + t.Errorf("SubCluster = %q, want %q", gotMeta.SubCluster, meta.SubCluster) + } + if gotMeta.Partition != meta.Partition { + t.Errorf("Partition = %q, want %q", gotMeta.Partition, meta.Partition) + } + if gotMeta.Project != meta.Project { + t.Errorf("Project = %q, want %q", gotMeta.Project, meta.Project) + } + if gotMeta.User != meta.User { + t.Errorf("User = %q, want %q", gotMeta.User, meta.User) + } + if gotMeta.State != meta.State { + t.Errorf("State = %q, want %q", gotMeta.State, meta.State) + } + if gotMeta.StartTime != meta.StartTime { + t.Errorf("StartTime = %d, want %d", gotMeta.StartTime, meta.StartTime) + } + if gotMeta.Duration != meta.Duration { + t.Errorf("Duration = %d, want %d", gotMeta.Duration, meta.Duration) + } + if gotMeta.Walltime != meta.Walltime { + t.Errorf("Walltime = %d, want %d", gotMeta.Walltime, meta.Walltime) + } + if gotMeta.NumNodes != meta.NumNodes { + t.Errorf("NumNodes = %d, want %d", gotMeta.NumNodes, meta.NumNodes) + } + if gotMeta.NumHWThreads != meta.NumHWThreads { + t.Errorf("NumHWThreads = %d, want %d", gotMeta.NumHWThreads, meta.NumHWThreads) + } + if gotMeta.NumAcc != meta.NumAcc { + t.Errorf("NumAcc = %d, want %d", gotMeta.NumAcc, meta.NumAcc) + } + if gotMeta.Energy != meta.Energy { + t.Errorf("Energy = %f, want %f", gotMeta.Energy, meta.Energy) + } + if gotMeta.SMT != meta.SMT { + t.Errorf("SMT = %d, want %d", gotMeta.SMT, meta.SMT) + } + + // Verify complex fields + if len(gotMeta.Resources) != 2 { + t.Fatalf("Resources len = %d, want 2", len(gotMeta.Resources)) + } + if gotMeta.Resources[0].Hostname != "node001" { + t.Errorf("Resources[0].Hostname = %q, want %q", gotMeta.Resources[0].Hostname, "node001") + } + if len(gotMeta.Resources[0].HWThreads) != 4 { + t.Errorf("Resources[0].HWThreads len = %d, want 4", len(gotMeta.Resources[0].HWThreads)) + } + + if len(gotMeta.Statistics) != 1 { + t.Fatalf("Statistics len = %d, want 1", len(gotMeta.Statistics)) + } + if stat, ok := gotMeta.Statistics["cpu_load"]; !ok { + t.Error("Statistics missing cpu_load") + } else if stat.Avg != 50.0 { + t.Errorf("Statistics[cpu_load].Avg = %f, want 50.0", stat.Avg) + } + + if len(gotMeta.Tags) != 1 || gotMeta.Tags[0].Name != "tag1" { + t.Errorf("Tags = %v, want [{test tag1}]", gotMeta.Tags) + } + + if gotMeta.MetaData["key1"] != "value1" { + t.Errorf("MetaData[key1] = %q, want %q", gotMeta.MetaData["key1"], "value1") + } + + if gotMeta.Footprint["cpu_load"] != 50.0 { + t.Errorf("Footprint[cpu_load] = %f, want 50.0", gotMeta.Footprint["cpu_load"]) + } + + if gotMeta.EnergyFootprint["total"] != 123.45 { + t.Errorf("EnergyFootprint[total] = %f, want 123.45", gotMeta.EnergyFootprint["total"]) + } + + // Verify metric data + if gotData == nil { + t.Fatal("JobData is nil") + } + cpuLoad, ok := (*gotData)["cpu_load"] + if !ok { + t.Fatal("JobData missing cpu_load") + } + nodeMetric, ok := cpuLoad[schema.MetricScopeNode] + if !ok { + t.Fatal("cpu_load missing node scope") + } + if nodeMetric.Timestep != 60 { + t.Errorf("Timestep = %d, want 60", nodeMetric.Timestep) + } + if len(nodeMetric.Series) != 1 { + t.Fatalf("Series len = %d, want 1", len(nodeMetric.Series)) + } + if nodeMetric.Series[0].Hostname != "node001" { + t.Errorf("Series[0].Hostname = %q, want %q", nodeMetric.Series[0].Hostname, "node001") + } + if len(nodeMetric.Series[0].Data) != 3 { + t.Errorf("Series[0].Data len = %d, want 3", len(nodeMetric.Series[0].Data)) + } +} + +func TestParquetRowToJobNilOptionalFields(t *testing.T) { + meta := &schema.Job{ + JobID: 1, + Cluster: "test", + SubCluster: "sc0", + Project: "proj", + User: "user", + State: schema.JobStateCompleted, + StartTime: 1700000000, + Duration: 60, + NumNodes: 1, + Resources: []*schema.Resource{ + {Hostname: "node001"}, + }, + } + + data := &schema.JobData{ + "cpu_load": { + schema.MetricScopeNode: &schema.JobMetric{ + Timestep: 60, + Series: []schema.Series{ + {Hostname: "node001", Data: []schema.Float{1.0}}, + }, + }, + }, + } + + row, err := JobToParquetRow(meta, data) + if err != nil { + t.Fatalf("JobToParquetRow: %v", err) + } + + gotMeta, gotData, err := ParquetRowToJob(row) + if err != nil { + t.Fatalf("ParquetRowToJob: %v", err) + } + + if gotMeta.JobID != 1 { + t.Errorf("JobID = %d, want 1", gotMeta.JobID) + } + if gotMeta.Tags != nil { + t.Errorf("Tags should be nil, got %v", gotMeta.Tags) + } + if gotMeta.Statistics != nil { + t.Errorf("Statistics should be nil, got %v", gotMeta.Statistics) + } + if gotMeta.MetaData != nil { + t.Errorf("MetaData should be nil, got %v", gotMeta.MetaData) + } + if gotMeta.Footprint != nil { + t.Errorf("Footprint should be nil, got %v", gotMeta.Footprint) + } + if gotMeta.EnergyFootprint != nil { + t.Errorf("EnergyFootprint should be nil, got %v", gotMeta.EnergyFootprint) + } + if gotData == nil { + t.Fatal("JobData is nil") + } +} + +func TestRoundTripThroughParquetFile(t *testing.T) { + meta, data := makeTestJob(999) + meta.Tags = []*schema.Tag{{Type: "test", Name: "roundtrip"}} + + // Convert to row and write to parquet + row, err := JobToParquetRow(meta, data) + if err != nil { + t.Fatalf("JobToParquetRow: %v", err) + } + + // Write to parquet bytes + parquetBytes, err := writeParquetBytes([]ParquetJobRow{*row}) + if err != nil { + t.Fatalf("writeParquetBytes: %v", err) + } + + // Read back from parquet bytes + rows, err := ReadParquetFile(parquetBytes) + if err != nil { + t.Fatalf("ReadParquetFile: %v", err) + } + if len(rows) != 1 { + t.Fatalf("expected 1 row, got %d", len(rows)) + } + + // Convert back to job + gotMeta, gotData, err := ParquetRowToJob(&rows[0]) + if err != nil { + t.Fatalf("ParquetRowToJob: %v", err) + } + + // Verify key fields survived the round trip + if gotMeta.JobID != 999 { + t.Errorf("JobID = %d, want 999", gotMeta.JobID) + } + if gotMeta.Cluster != "testcluster" { + t.Errorf("Cluster = %q, want %q", gotMeta.Cluster, "testcluster") + } + if gotMeta.User != "testuser" { + t.Errorf("User = %q, want %q", gotMeta.User, "testuser") + } + if gotMeta.State != schema.JobStateCompleted { + t.Errorf("State = %q, want %q", gotMeta.State, schema.JobStateCompleted) + } + if len(gotMeta.Tags) != 1 || gotMeta.Tags[0].Name != "roundtrip" { + t.Errorf("Tags = %v, want [{test roundtrip}]", gotMeta.Tags) + } + if len(gotMeta.Resources) != 2 { + t.Errorf("Resources len = %d, want 2", len(gotMeta.Resources)) + } + + if gotData == nil { + t.Fatal("JobData is nil") + } + if _, ok := (*gotData)["cpu_load"]; !ok { + t.Error("JobData missing cpu_load") + } +} diff --git a/pkg/archive/parquet/reader.go b/pkg/archive/parquet/reader.go new file mode 100644 index 00000000..32486bd5 --- /dev/null +++ b/pkg/archive/parquet/reader.go @@ -0,0 +1,216 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. This file is part of cc-backend. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package parquet + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "strings" + + "github.com/ClusterCockpit/cc-lib/v2/schema" + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + pq "github.com/parquet-go/parquet-go" +) + +// ReadParquetFile reads all ParquetJobRow entries from parquet-encoded bytes. +func ReadParquetFile(data []byte) ([]ParquetJobRow, error) { + file, err := pq.OpenFile(bytes.NewReader(data), int64(len(data))) + if err != nil { + return nil, fmt.Errorf("open parquet: %w", err) + } + + reader := pq.NewGenericReader[ParquetJobRow](file) + defer reader.Close() + + numRows := file.NumRows() + rows := make([]ParquetJobRow, numRows) + n, err := reader.Read(rows) + if err != nil && err != io.EOF { + return nil, fmt.Errorf("read parquet rows: %w", err) + } + + return rows[:n], nil +} + +// ParquetSource abstracts reading parquet archives from different storage backends. +type ParquetSource interface { + GetClusters() ([]string, error) + ListParquetFiles(cluster string) ([]string, error) + ReadFile(path string) ([]byte, error) + ReadClusterConfig(cluster string) (*schema.Cluster, error) +} + +// FileParquetSource reads parquet archives from a local filesystem directory. +type FileParquetSource struct { + path string +} + +func NewFileParquetSource(path string) *FileParquetSource { + return &FileParquetSource{path: path} +} + +func (fs *FileParquetSource) GetClusters() ([]string, error) { + entries, err := os.ReadDir(fs.path) + if err != nil { + return nil, fmt.Errorf("read directory: %w", err) + } + + var clusters []string + for _, e := range entries { + if e.IsDir() { + clusters = append(clusters, e.Name()) + } + } + return clusters, nil +} + +func (fs *FileParquetSource) ListParquetFiles(cluster string) ([]string, error) { + dir := filepath.Join(fs.path, cluster) + entries, err := os.ReadDir(dir) + if err != nil { + return nil, fmt.Errorf("read cluster directory: %w", err) + } + + var files []string + for _, e := range entries { + if !e.IsDir() && strings.HasSuffix(e.Name(), ".parquet") { + files = append(files, filepath.Join(cluster, e.Name())) + } + } + return files, nil +} + +func (fs *FileParquetSource) ReadFile(path string) ([]byte, error) { + return os.ReadFile(filepath.Join(fs.path, path)) +} + +func (fs *FileParquetSource) ReadClusterConfig(cluster string) (*schema.Cluster, error) { + data, err := os.ReadFile(filepath.Join(fs.path, cluster, "cluster.json")) + if err != nil { + return nil, fmt.Errorf("read cluster.json: %w", err) + } + var cfg schema.Cluster + if err := json.Unmarshal(data, &cfg); err != nil { + return nil, fmt.Errorf("unmarshal cluster config: %w", err) + } + return &cfg, nil +} + +// S3ParquetSource reads parquet archives from an S3-compatible object store. +type S3ParquetSource struct { + client *s3.Client + bucket string +} + +func NewS3ParquetSource(cfg S3TargetConfig) (*S3ParquetSource, error) { + if cfg.Bucket == "" { + return nil, fmt.Errorf("S3 source: empty bucket name") + } + + region := cfg.Region + if region == "" { + region = "us-east-1" + } + + awsCfg, err := awsconfig.LoadDefaultConfig(context.Background(), + awsconfig.WithRegion(region), + awsconfig.WithCredentialsProvider( + credentials.NewStaticCredentialsProvider(cfg.AccessKey, cfg.SecretKey, ""), + ), + ) + if err != nil { + return nil, fmt.Errorf("S3 source: load AWS config: %w", err) + } + + opts := func(o *s3.Options) { + if cfg.Endpoint != "" { + o.BaseEndpoint = aws.String(cfg.Endpoint) + } + o.UsePathStyle = cfg.UsePathStyle + } + + client := s3.NewFromConfig(awsCfg, opts) + return &S3ParquetSource{client: client, bucket: cfg.Bucket}, nil +} + +func (ss *S3ParquetSource) GetClusters() ([]string, error) { + ctx := context.Background() + paginator := s3.NewListObjectsV2Paginator(ss.client, &s3.ListObjectsV2Input{ + Bucket: aws.String(ss.bucket), + Delimiter: aws.String("/"), + }) + + var clusters []string + for paginator.HasMorePages() { + page, err := paginator.NextPage(ctx) + if err != nil { + return nil, fmt.Errorf("S3 source: list clusters: %w", err) + } + for _, prefix := range page.CommonPrefixes { + if prefix.Prefix != nil { + name := strings.TrimSuffix(*prefix.Prefix, "/") + clusters = append(clusters, name) + } + } + } + return clusters, nil +} + +func (ss *S3ParquetSource) ListParquetFiles(cluster string) ([]string, error) { + ctx := context.Background() + prefix := cluster + "/" + paginator := s3.NewListObjectsV2Paginator(ss.client, &s3.ListObjectsV2Input{ + Bucket: aws.String(ss.bucket), + Prefix: aws.String(prefix), + }) + + var files []string + for paginator.HasMorePages() { + page, err := paginator.NextPage(ctx) + if err != nil { + return nil, fmt.Errorf("S3 source: list parquet files: %w", err) + } + for _, obj := range page.Contents { + if obj.Key != nil && strings.HasSuffix(*obj.Key, ".parquet") { + files = append(files, *obj.Key) + } + } + } + return files, nil +} + +func (ss *S3ParquetSource) ReadFile(path string) ([]byte, error) { + ctx := context.Background() + result, err := ss.client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(ss.bucket), + Key: aws.String(path), + }) + if err != nil { + return nil, fmt.Errorf("S3 source: get object %q: %w", path, err) + } + defer result.Body.Close() + return io.ReadAll(result.Body) +} + +func (ss *S3ParquetSource) ReadClusterConfig(cluster string) (*schema.Cluster, error) { + data, err := ss.ReadFile(cluster + "/cluster.json") + if err != nil { + return nil, fmt.Errorf("read cluster.json: %w", err) + } + var cfg schema.Cluster + if err := json.Unmarshal(data, &cfg); err != nil { + return nil, fmt.Errorf("unmarshal cluster config: %w", err) + } + return &cfg, nil +} diff --git a/pkg/archive/parquet/target.go b/pkg/archive/parquet/target.go index 0e8babc2..090a230d 100644 --- a/pkg/archive/parquet/target.go +++ b/pkg/archive/parquet/target.go @@ -36,7 +36,11 @@ func NewFileTarget(path string) (*FileTarget, error) { } func (ft *FileTarget) WriteFile(name string, data []byte) error { - return os.WriteFile(filepath.Join(ft.path, name), data, 0o640) + fullPath := filepath.Join(ft.path, name) + if err := os.MkdirAll(filepath.Dir(fullPath), 0o750); err != nil { + return fmt.Errorf("create parent directory: %w", err) + } + return os.WriteFile(fullPath, data, 0o640) } // S3TargetConfig holds the configuration for an S3 parquet target. diff --git a/pkg/archive/parquet/writer.go b/pkg/archive/parquet/writer.go index ab56cace..2669a9c8 100644 --- a/pkg/archive/parquet/writer.go +++ b/pkg/archive/parquet/writer.go @@ -7,10 +7,13 @@ package parquet import ( "bytes" + "encoding/json" "fmt" + "path" "time" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/schema" pq "github.com/parquet-go/parquet-go" ) @@ -111,3 +114,68 @@ func estimateRowSize(row *ParquetJobRow) int64 { size += int64(len(row.MetricDataGz)) return size } + +// prefixedTarget wraps a ParquetTarget and prepends a path prefix to all file names. +type prefixedTarget struct { + inner ParquetTarget + prefix string +} + +func (pt *prefixedTarget) WriteFile(name string, data []byte) error { + return pt.inner.WriteFile(path.Join(pt.prefix, name), data) +} + +// ClusterAwareParquetWriter organizes Parquet output by cluster. +// Each cluster gets its own subdirectory with a cluster.json config file. +type ClusterAwareParquetWriter struct { + target ParquetTarget + maxSizeMB int + writers map[string]*ParquetWriter + clusterCfgs map[string]*schema.Cluster +} + +// NewClusterAwareParquetWriter creates a writer that routes jobs to per-cluster ParquetWriters. +func NewClusterAwareParquetWriter(target ParquetTarget, maxSizeMB int) *ClusterAwareParquetWriter { + return &ClusterAwareParquetWriter{ + target: target, + maxSizeMB: maxSizeMB, + writers: make(map[string]*ParquetWriter), + clusterCfgs: make(map[string]*schema.Cluster), + } +} + +// SetClusterConfig stores a cluster configuration to be written as cluster.json on Close. +func (cw *ClusterAwareParquetWriter) SetClusterConfig(name string, cfg *schema.Cluster) { + cw.clusterCfgs[name] = cfg +} + +// AddJob routes the job row to the appropriate per-cluster writer. +func (cw *ClusterAwareParquetWriter) AddJob(row ParquetJobRow) error { + cluster := row.Cluster + pw, ok := cw.writers[cluster] + if !ok { + pw = NewParquetWriter(&prefixedTarget{inner: cw.target, prefix: cluster}, cw.maxSizeMB) + cw.writers[cluster] = pw + } + return pw.AddJob(row) +} + +// Close writes cluster.json files and flushes all per-cluster writers. +func (cw *ClusterAwareParquetWriter) Close() error { + for name, cfg := range cw.clusterCfgs { + data, err := json.MarshalIndent(cfg, "", " ") + if err != nil { + return fmt.Errorf("marshal cluster config %q: %w", name, err) + } + if err := cw.target.WriteFile(path.Join(name, "cluster.json"), data); err != nil { + return fmt.Errorf("write cluster.json for %q: %w", name, err) + } + } + + for cluster, pw := range cw.writers { + if err := pw.Close(); err != nil { + return fmt.Errorf("close writer for cluster %q: %w", cluster, err) + } + } + return nil +} diff --git a/pkg/archive/parquet/writer_test.go b/pkg/archive/parquet/writer_test.go index e532e472..57b4ca4c 100644 --- a/pkg/archive/parquet/writer_test.go +++ b/pkg/archive/parquet/writer_test.go @@ -10,6 +10,9 @@ import ( "compress/gzip" "encoding/json" "io" + "os" + "path/filepath" + "strings" "sync" "testing" @@ -222,3 +225,137 @@ func TestFileTarget(t *testing.T) { // Verify file exists and has correct content // (using the target itself is sufficient; we just check no error) } + +func TestFileTargetSubdirectories(t *testing.T) { + dir := t.TempDir() + ft, err := NewFileTarget(dir) + if err != nil { + t.Fatalf("NewFileTarget: %v", err) + } + + testData := []byte("test data in subdir") + if err := ft.WriteFile("fritz/cc-archive-2025-01-20-001.parquet", testData); err != nil { + t.Fatalf("WriteFile with subdir: %v", err) + } + + // Verify file was created in subdirectory + content, err := os.ReadFile(filepath.Join(dir, "fritz", "cc-archive-2025-01-20-001.parquet")) + if err != nil { + t.Fatalf("read file in subdir: %v", err) + } + if !bytes.Equal(content, testData) { + t.Error("file content mismatch") + } +} + +func makeTestJobForCluster(jobID int64, cluster string) (*schema.Job, *schema.JobData) { + meta, data := makeTestJob(jobID) + meta.Cluster = cluster + return meta, data +} + +func TestClusterAwareParquetWriter(t *testing.T) { + target := newMemTarget() + cw := NewClusterAwareParquetWriter(target, 512) + + // Set cluster configs + cw.SetClusterConfig("fritz", &schema.Cluster{Name: "fritz"}) + cw.SetClusterConfig("alex", &schema.Cluster{Name: "alex"}) + + // Add jobs from different clusters + for i := int64(0); i < 3; i++ { + meta, data := makeTestJobForCluster(i, "fritz") + row, err := JobToParquetRow(meta, data) + if err != nil { + t.Fatalf("convert fritz job %d: %v", i, err) + } + if err := cw.AddJob(*row); err != nil { + t.Fatalf("add fritz job %d: %v", i, err) + } + } + + for i := int64(10); i < 12; i++ { + meta, data := makeTestJobForCluster(i, "alex") + row, err := JobToParquetRow(meta, data) + if err != nil { + t.Fatalf("convert alex job %d: %v", i, err) + } + if err := cw.AddJob(*row); err != nil { + t.Fatalf("add alex job %d: %v", i, err) + } + } + + if err := cw.Close(); err != nil { + t.Fatalf("close: %v", err) + } + + target.mu.Lock() + defer target.mu.Unlock() + + // Check cluster.json files were written + if _, ok := target.files["fritz/cluster.json"]; !ok { + t.Error("missing fritz/cluster.json") + } + if _, ok := target.files["alex/cluster.json"]; !ok { + t.Error("missing alex/cluster.json") + } + + // Verify cluster.json content + var clusterCfg schema.Cluster + if err := json.Unmarshal(target.files["fritz/cluster.json"], &clusterCfg); err != nil { + t.Fatalf("unmarshal fritz cluster.json: %v", err) + } + if clusterCfg.Name != "fritz" { + t.Errorf("fritz cluster name = %q, want %q", clusterCfg.Name, "fritz") + } + + // Check parquet files are in cluster subdirectories + fritzParquets := 0 + alexParquets := 0 + for name := range target.files { + if strings.HasPrefix(name, "fritz/") && strings.HasSuffix(name, ".parquet") { + fritzParquets++ + } + if strings.HasPrefix(name, "alex/") && strings.HasSuffix(name, ".parquet") { + alexParquets++ + } + } + if fritzParquets == 0 { + t.Error("no parquet files in fritz/") + } + if alexParquets == 0 { + t.Error("no parquet files in alex/") + } + + // Verify parquet files are readable and have correct row counts + for name, data := range target.files { + if !strings.HasSuffix(name, ".parquet") { + continue + } + file := bytes.NewReader(data) + pf, err := pq.OpenFile(file, int64(len(data))) + if err != nil { + t.Errorf("open parquet %s: %v", name, err) + continue + } + if strings.HasPrefix(name, "fritz/") && pf.NumRows() != 3 { + t.Errorf("fritz parquet rows = %d, want 3", pf.NumRows()) + } + if strings.HasPrefix(name, "alex/") && pf.NumRows() != 2 { + t.Errorf("alex parquet rows = %d, want 2", pf.NumRows()) + } + } +} + +func TestClusterAwareParquetWriterEmpty(t *testing.T) { + target := newMemTarget() + cw := NewClusterAwareParquetWriter(target, 512) + + if err := cw.Close(); err != nil { + t.Fatalf("close empty writer: %v", err) + } + + if len(target.files) != 0 { + t.Errorf("expected no files for empty writer, got %d", len(target.files)) + } +} diff --git a/tools/archive-manager/README.md b/tools/archive-manager/README.md new file mode 100644 index 00000000..c006a63e --- /dev/null +++ b/tools/archive-manager/README.md @@ -0,0 +1,148 @@ +# Archive Manager + +## Overview + +The `archive-manager` tool manages ClusterCockpit job archives. It supports inspecting archives, validating jobs, removing jobs by date range, importing jobs between archive backends, and converting archives between JSON and Parquet formats. + +## Features + +- **Archive Info**: Display statistics about an existing job archive +- **Validation**: Validate job archives against the JSON schema +- **Cleanup**: Remove jobs by date range +- **Import**: Copy jobs between archive backends (file, S3, SQLite) with parallel processing +- **Convert**: Convert archives between JSON and Parquet formats (both directions) +- **Progress Reporting**: Real-time progress display with ETA and throughput metrics +- **Graceful Interruption**: CTRL-C stops processing after finishing current jobs + +## Usage + +### Build + +```bash +go build ./tools/archive-manager/ +``` + +### Archive Info + +Display statistics about a job archive: + +```bash +./archive-manager -s ./var/job-archive +``` + +### Validate Archive + +```bash +./archive-manager -s ./var/job-archive --validate --config ./config.json +``` + +### Remove Jobs by Date + +```bash +# Remove jobs started before a date +./archive-manager -s ./var/job-archive --remove-before 2023-Jan-01 --config ./config.json + +# Remove jobs started after a date +./archive-manager -s ./var/job-archive --remove-after 2024-Dec-31 --config ./config.json +``` + +### Import Between Backends + +Import jobs from one archive backend to another (e.g., file to S3, file to SQLite): + +```bash +./archive-manager --import \ + --src-config '{"kind":"file","path":"./var/job-archive"}' \ + --dst-config '{"kind":"s3","endpoint":"https://s3.example.com","bucket":"archive","access-key":"...","secret-key":"..."}' +``` + +### Convert JSON to Parquet + +Convert a JSON job archive to Parquet format: + +```bash +./archive-manager --convert --format parquet \ + --src-config '{"kind":"file","path":"./var/job-archive"}' \ + --dst-config '{"kind":"file","path":"./var/parquet-archive"}' +``` + +The source (`--src-config`) is a standard archive backend config (file, S3, or SQLite). The destination (`--dst-config`) specifies where to write parquet files. + +### Convert Parquet to JSON + +Convert a Parquet archive back to JSON format: + +```bash +./archive-manager --convert --format json \ + --src-config '{"kind":"file","path":"./var/parquet-archive"}' \ + --dst-config '{"kind":"file","path":"./var/json-archive"}' +``` + +The source (`--src-config`) points to a directory or S3 bucket containing parquet files organized by cluster. The destination (`--dst-config`) is a standard archive backend config. + +### S3 Source/Destination Example + +Both conversion directions support S3: + +```bash +# JSON (S3) -> Parquet (local) +./archive-manager --convert --format parquet \ + --src-config '{"kind":"s3","endpoint":"https://s3.example.com","bucket":"json-archive","accessKey":"...","secretKey":"..."}' \ + --dst-config '{"kind":"file","path":"./var/parquet-archive"}' + +# Parquet (local) -> JSON (S3) +./archive-manager --convert --format json \ + --src-config '{"kind":"file","path":"./var/parquet-archive"}' \ + --dst-config '{"kind":"s3","endpoint":"https://s3.example.com","bucket":"json-archive","access-key":"...","secret-key":"..."}' +``` + +## Command-Line Options + +| Flag | Default | Description | +|------|---------|-------------| +| `-s` | `./var/job-archive` | Source job archive path (for info/validate/remove modes) | +| `--config` | `./config.json` | Path to config.json | +| `--loglevel` | `info` | Logging level: debug, info, warn, err, fatal, crit | +| `--logdate` | `false` | Add timestamps to log messages | +| `--validate` | `false` | Validate archive against JSON schema | +| `--remove-before` | | Remove jobs started before date (Format: 2006-Jan-02) | +| `--remove-after` | | Remove jobs started after date (Format: 2006-Jan-02) | +| `--import` | `false` | Import jobs between archive backends | +| `--convert` | `false` | Convert archive between JSON and Parquet formats | +| `--format` | `json` | Output format for conversion: `json` or `parquet` | +| `--max-file-size` | `512` | Max parquet file size in MB (only for parquet output) | +| `--src-config` | | Source config JSON (required for import/convert) | +| `--dst-config` | | Destination config JSON (required for import/convert) | + +## Parquet Archive Layout + +When converting to Parquet, the output is organized by cluster: + +``` +parquet-archive/ + clusterA/ + cluster.json + cc-archive-2025-01-20-001.parquet + cc-archive-2025-01-20-002.parquet + clusterB/ + cluster.json + cc-archive-2025-01-20-001.parquet +``` + +Each parquet file contains job metadata and gzip-compressed metric data. The `cluster.json` file preserves the cluster configuration from the source archive. + +## Round-Trip Conversion + +Archives can be converted from JSON to Parquet and back without data loss: + +```bash +# Original JSON archive +./archive-manager --convert --format parquet \ + --src-config '{"kind":"file","path":"./var/job-archive"}' \ + --dst-config '{"kind":"file","path":"./var/parquet-archive"}' + +# Convert back to JSON +./archive-manager --convert --format json \ + --src-config '{"kind":"file","path":"./var/parquet-archive"}' \ + --dst-config '{"kind":"file","path":"./var/json-archive"}' +``` diff --git a/tools/archive-manager/main.go b/tools/archive-manager/main.go index 918fc7c8..4a9094c0 100644 --- a/tools/archive-manager/main.go +++ b/tools/archive-manager/main.go @@ -23,6 +23,7 @@ import ( "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/pkg/archive" + pqarchive "github.com/ClusterCockpit/cc-backend/pkg/archive/parquet" ccconf "github.com/ClusterCockpit/cc-lib/v2/ccConfig" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" ) @@ -372,10 +373,207 @@ func importArchive(srcBackend, dstBackend archive.ArchiveBackend, srcConfig stri return finalImported, finalFailed, nil } +// parseSourceConfig parses the common kind/path/s3 fields from a config JSON string. +type sourceConfig struct { + Kind string `json:"kind"` + Path string `json:"path"` + Endpoint string `json:"endpoint"` + Bucket string `json:"bucket"` + AccessKey string `json:"accessKey"` + SecretKey string `json:"secretKey"` + Region string `json:"region"` + UsePathStyle bool `json:"usePathStyle"` +} + +// createParquetTarget creates a ParquetTarget from a parsed config. +func createParquetTarget(cfg sourceConfig) (pqarchive.ParquetTarget, error) { + switch cfg.Kind { + case "s3": + return pqarchive.NewS3Target(pqarchive.S3TargetConfig{ + Endpoint: cfg.Endpoint, + Bucket: cfg.Bucket, + AccessKey: cfg.AccessKey, + SecretKey: cfg.SecretKey, + Region: cfg.Region, + UsePathStyle: cfg.UsePathStyle, + }) + default: + return pqarchive.NewFileTarget(cfg.Path) + } +} + +// createParquetSource creates a ParquetSource from a parsed config. +func createParquetSource(cfg sourceConfig) (pqarchive.ParquetSource, error) { + switch cfg.Kind { + case "s3": + return pqarchive.NewS3ParquetSource(pqarchive.S3TargetConfig{ + Endpoint: cfg.Endpoint, + Bucket: cfg.Bucket, + AccessKey: cfg.AccessKey, + SecretKey: cfg.SecretKey, + Region: cfg.Region, + UsePathStyle: cfg.UsePathStyle, + }) + default: + if cfg.Path == "" { + return nil, fmt.Errorf("file source: path is required") + } + return pqarchive.NewFileParquetSource(cfg.Path), nil + } +} + +// convertJSONToParquet converts a JSON archive backend to parquet format. +func convertJSONToParquet(srcBackend archive.ArchiveBackend, dstCfg sourceConfig, maxSizeMB int) error { + target, err := createParquetTarget(dstCfg) + if err != nil { + return fmt.Errorf("create parquet target: %w", err) + } + + cw := pqarchive.NewClusterAwareParquetWriter(target, maxSizeMB) + + // Transfer cluster configs + for _, clusterName := range srcBackend.GetClusters() { + clusterCfg, err := srcBackend.LoadClusterCfg(clusterName) + if err != nil { + cclog.Warnf("Convert: load cluster config %q: %v", clusterName, err) + continue + } + cw.SetClusterConfig(clusterName, clusterCfg) + } + + converted := 0 + failed := 0 + startTime := time.Now() + + for job := range srcBackend.Iter(true) { + if job.Meta == nil { + cclog.Warn("Skipping job with nil metadata") + failed++ + continue + } + if job.Data == nil { + cclog.Warnf("Job %d has no metric data, skipping", job.Meta.JobID) + failed++ + continue + } + + row, err := pqarchive.JobToParquetRow(job.Meta, job.Data) + if err != nil { + cclog.Warnf("Convert job %d: %v", job.Meta.JobID, err) + failed++ + continue + } + if err := cw.AddJob(*row); err != nil { + cclog.Errorf("Add job %d to writer: %v", job.Meta.JobID, err) + failed++ + continue + } + converted++ + + if converted%1000 == 0 { + cclog.Infof("Converted %d jobs so far...", converted) + } + } + + if err := cw.Close(); err != nil { + return fmt.Errorf("close parquet writer: %w", err) + } + + elapsed := time.Since(startTime) + cclog.Infof("JSON->Parquet conversion completed in %s: %d jobs converted, %d failed", + formatDuration(elapsed), converted, failed) + return nil +} + +// convertParquetToJSON converts a parquet archive to a JSON archive backend. +func convertParquetToJSON(srcCfg sourceConfig, dstBackend archive.ArchiveBackend) error { + src, err := createParquetSource(srcCfg) + if err != nil { + return fmt.Errorf("create parquet source: %w", err) + } + + clusters, err := src.GetClusters() + if err != nil { + return fmt.Errorf("list clusters: %w", err) + } + + converted := 0 + failed := 0 + skipped := 0 + startTime := time.Now() + + for _, cluster := range clusters { + // Transfer cluster config + clusterCfg, err := src.ReadClusterConfig(cluster) + if err != nil { + cclog.Warnf("Convert: read cluster config %q: %v", cluster, err) + } else { + if err := dstBackend.StoreClusterCfg(cluster, clusterCfg); err != nil { + cclog.Warnf("Convert: store cluster config %q: %v", cluster, err) + } else { + cclog.Infof("Imported cluster config for %s", cluster) + } + } + + // Read and convert parquet files + files, err := src.ListParquetFiles(cluster) + if err != nil { + cclog.Errorf("Convert: list parquet files for %q: %v", cluster, err) + continue + } + + for _, file := range files { + data, err := src.ReadFile(file) + if err != nil { + cclog.Errorf("Convert: read file %q: %v", file, err) + failed++ + continue + } + + rows, err := pqarchive.ReadParquetFile(data) + if err != nil { + cclog.Errorf("Convert: parse parquet file %q: %v", file, err) + failed++ + continue + } + + cclog.Infof("Processing %s: %d jobs", file, len(rows)) + + for _, row := range rows { + meta, jobData, err := pqarchive.ParquetRowToJob(&row) + if err != nil { + cclog.Warnf("Convert row to job: %v", err) + failed++ + continue + } + + if dstBackend.Exists(meta) { + skipped++ + continue + } + + if err := dstBackend.ImportJob(meta, jobData); err != nil { + cclog.Warnf("Import job %d: %v", meta.JobID, err) + failed++ + continue + } + converted++ + } + } + } + + elapsed := time.Since(startTime) + cclog.Infof("Parquet->JSON conversion completed in %s: %d jobs converted, %d skipped, %d failed", + formatDuration(elapsed), converted, skipped, failed) + return nil +} + func main() { var srcPath, flagConfigFile, flagLogLevel, flagRemoveCluster, flagRemoveAfter, flagRemoveBefore string var flagSrcConfig, flagDstConfig string - var flagLogDateTime, flagValidate, flagImport bool + var flagLogDateTime, flagValidate, flagImport, flagConvert bool + var flagFormat string + var flagMaxFileSize int flag.StringVar(&srcPath, "s", "./var/job-archive", "Specify the source job archive path. Default is ./var/job-archive") flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages") @@ -386,6 +584,9 @@ func main() { flag.StringVar(&flagRemoveAfter, "remove-after", "", "Remove all jobs with start time after date (Format: 2006-Jan-04)") flag.BoolVar(&flagValidate, "validate", false, "Set this flag to validate a job archive against the json schema") flag.BoolVar(&flagImport, "import", false, "Import jobs from source archive to destination archive") + flag.BoolVar(&flagConvert, "convert", false, "Convert archive between JSON and Parquet formats") + flag.StringVar(&flagFormat, "format", "json", "Output format for conversion: 'json' or 'parquet'") + flag.IntVar(&flagMaxFileSize, "max-file-size", 512, "Max parquet file size in MB (only for parquet output)") flag.StringVar(&flagSrcConfig, "src-config", "", "Source archive backend configuration (JSON), e.g. '{\"kind\":\"file\",\"path\":\"./archive\"}'") flag.StringVar(&flagDstConfig, "dst-config", "", "Destination archive backend configuration (JSON), e.g. '{\"kind\":\"sqlite\",\"dbPath\":\"./archive.db\"}'") flag.Parse() @@ -429,6 +630,49 @@ func main() { os.Exit(0) } + // Handle convert mode + if flagConvert { + if flagSrcConfig == "" || flagDstConfig == "" { + cclog.Fatal("Both --src-config and --dst-config must be specified for convert mode") + } + + var srcCfg, dstCfg sourceConfig + if err := json.Unmarshal([]byte(flagSrcConfig), &srcCfg); err != nil { + cclog.Fatalf("Failed to parse source config: %s", err.Error()) + } + if err := json.Unmarshal([]byte(flagDstConfig), &dstCfg); err != nil { + cclog.Fatalf("Failed to parse destination config: %s", err.Error()) + } + + switch flagFormat { + case "parquet": + // JSON archive -> Parquet: source is an archive backend + cclog.Info("Convert mode: JSON -> Parquet") + srcBackend, err := archive.InitBackend(json.RawMessage(flagSrcConfig)) + if err != nil { + cclog.Fatalf("Failed to initialize source backend: %s", err.Error()) + } + if err := convertJSONToParquet(srcBackend, dstCfg, flagMaxFileSize); err != nil { + cclog.Fatalf("Conversion failed: %s", err.Error()) + } + case "json": + // Parquet -> JSON archive: destination is an archive backend + cclog.Info("Convert mode: Parquet -> JSON") + dstBackend, err := archive.InitBackend(json.RawMessage(flagDstConfig)) + if err != nil { + cclog.Fatalf("Failed to initialize destination backend: %s", err.Error()) + } + if err := convertParquetToJSON(srcCfg, dstBackend); err != nil { + cclog.Fatalf("Conversion failed: %s", err.Error()) + } + default: + cclog.Fatalf("Unknown format %q: must be 'json' or 'parquet'", flagFormat) + } + + cclog.Info("Conversion finished successfully") + os.Exit(0) + } + ccconf.Init(flagConfigFile) // Load and check main configuration diff --git a/web/frontend/package-lock.json b/web/frontend/package-lock.json index e3451242..6962dc1b 100644 --- a/web/frontend/package-lock.json +++ b/web/frontend/package-lock.json @@ -250,7 +250,6 @@ "cpu": [ "arm" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -264,7 +263,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -278,7 +276,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -292,7 +289,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -306,7 +302,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -320,7 +315,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -334,7 +328,6 @@ "cpu": [ "arm" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -348,7 +341,6 @@ "cpu": [ "arm" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -362,7 +354,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -376,7 +367,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -390,7 +380,6 @@ "cpu": [ "loong64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -404,7 +393,6 @@ "cpu": [ "loong64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -418,7 +406,6 @@ "cpu": [ "ppc64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -432,7 +419,6 @@ "cpu": [ "ppc64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -446,7 +432,6 @@ "cpu": [ "riscv64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -460,7 +445,6 @@ "cpu": [ "riscv64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -474,7 +458,6 @@ "cpu": [ "s390x" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -488,7 +471,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -502,7 +484,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -516,7 +497,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -530,7 +510,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -544,7 +523,6 @@ "cpu": [ "arm64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -558,7 +536,6 @@ "cpu": [ "ia32" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -572,7 +549,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -586,7 +562,6 @@ "cpu": [ "x64" ], - "dev": true, "license": "MIT", "optional": true, "os": [ @@ -837,7 +812,6 @@ "version": "2.3.3", "resolved": "https://registry.npmjs.org/fsevents/-/fsevents-2.3.3.tgz", "integrity": "sha512-5xoDfX+fL7faATnagmWPpbFtwh/R77WmMMqqHGS65C3vvB0YHrgF+B1YmZ3441tMj5n63k0212XNoJwzlhffQw==", - "dev": true, "hasInstallScript": true, "license": "MIT", "optional": true, diff --git a/web/frontend/rollup.config.mjs b/web/frontend/rollup.config.mjs index 6b7cf884..8aca6161 100644 --- a/web/frontend/rollup.config.mjs +++ b/web/frontend/rollup.config.mjs @@ -75,5 +75,6 @@ export default [ entrypoint('analysis', 'src/analysis.entrypoint.js'), entrypoint('status', 'src/status.entrypoint.js'), entrypoint('dashpublic', 'src/dashpublic.entrypoint.js'), - entrypoint('config', 'src/config.entrypoint.js') + entrypoint('config', 'src/config.entrypoint.js'), + entrypoint('logs', 'src/logs.entrypoint.js') ]; diff --git a/web/frontend/src/Header.svelte b/web/frontend/src/Header.svelte index c173a9f4..862981fd 100644 --- a/web/frontend/src/Header.svelte +++ b/web/frontend/src/Header.svelte @@ -135,6 +135,16 @@ listOptions: true, menu: "Info", }, + { + title: "Logs", + // svelte-ignore state_referenced_locally + requiredRole: roles.admin, + href: "/monitoring/logs", + icon: "journal-text", + perCluster: false, + listOptions: false, + menu: "Info", + }, ]; /* State Init */ diff --git a/web/frontend/src/Logs.root.svelte b/web/frontend/src/Logs.root.svelte new file mode 100644 index 00000000..ccadabce --- /dev/null +++ b/web/frontend/src/Logs.root.svelte @@ -0,0 +1,254 @@ + + + + +{#if !isAdmin} + + +

Access denied. Admin privileges required.

+
+
+{:else} + + +
+ + + {#each timeRanges as tr} + + {/each} + + + + + + {#each levels as lv} + + {/each} + + + + + Lines + + + + + + + + + + { + if (e.key === "Enter") fetchLogs(); + }} + /> + + + + + + Auto + + {#each refreshIntervals as ri} + + {/each} + + + + {#if entries.length > 0} + {entries.length} entries + {/if} +
+
+ + {#if error} +
{error}
+ {/if} + +
+ + + + + + + + + + {#each entries as entry} + + + + + + {:else} + {#if !loading && !error} + + {/if} + {/each} + +
TimestampLevelMessage
{formatTimestamp(entry.timestamp)}{levelName(entry.priority)}{entry.message}
No log entries found
+
+
+
+{/if} diff --git a/web/frontend/src/generic/JobList.svelte b/web/frontend/src/generic/JobList.svelte index 278f189e..3ccbb560 100644 --- a/web/frontend/src/generic/JobList.svelte +++ b/web/frontend/src/generic/JobList.svelte @@ -305,7 +305,7 @@ {#if $jobsStore.fetching || !$jobsStore.data} -
+
diff --git a/web/frontend/src/logs.entrypoint.js b/web/frontend/src/logs.entrypoint.js new file mode 100644 index 00000000..5eb3c0c8 --- /dev/null +++ b/web/frontend/src/logs.entrypoint.js @@ -0,0 +1,10 @@ +import { mount } from 'svelte'; +import {} from './header.entrypoint.js' +import Logs from './Logs.root.svelte' + +mount(Logs, { + target: document.getElementById('svelte-app'), + props: { + isAdmin: isAdmin, + } +}) diff --git a/web/frontend/src/systems/NodeList.svelte b/web/frontend/src/systems/NodeList.svelte index da196b82..403a8030 100644 --- a/web/frontend/src/systems/NodeList.svelte +++ b/web/frontend/src/systems/NodeList.svelte @@ -104,7 +104,7 @@ let itemsPerPage = $derived(usePaging ? (ccconfig?.nodeList_nodesPerPage || 10) : 10); let paging = $derived({ itemsPerPage, page }); - const nodesQuery = $derived(queryStore({ + const nodesStore = $derived(queryStore({ client: client, query: nodeListQuery, variables: { @@ -122,7 +122,7 @@ requestPolicy: "network-only", // Resolution queries are cached, but how to access them? For now: reload on every change })); - const matchedNodes = $derived($nodesQuery?.data?.nodeMetricsList?.totalNodes || 0); + const matchedNodes = $derived($nodesStore?.data?.nodeMetricsList?.totalNodes || 0); /* Effects */ $effect(() => { @@ -135,7 +135,7 @@ } = document.documentElement; // Add 100 px offset to trigger load earlier - if (scrollTop + clientHeight >= scrollHeight - 100 && $nodesQuery?.data?.nodeMetricsList?.hasNextPage) { + if (scrollTop + clientHeight >= scrollHeight - 100 && $nodesStore?.data?.nodeMetricsList?.hasNextPage) { page += 1 }; }); @@ -143,21 +143,30 @@ }); $effect(() => { - if ($nodesQuery?.data) { + if ($nodesStore?.data) { untrack(() => { - handleNodes($nodesQuery?.data?.nodeMetricsList?.items); + handleNodes($nodesStore?.data?.nodeMetricsList?.items); }); selectedMetrics = [...pendingSelectedMetrics]; // Trigger Rerender in NodeListRow Only After Data is Fetched }; }); $effect(() => { - // Triggers (Except Paging) + // Update NodeListRows metrics only: Keep ordered nodes on page 1 from, to pendingSelectedMetrics, selectedResolution + // Continous Scroll: Paging if parameters change: Existing entries will not match new selections + if (!usePaging) { + nodes = []; + page = 1; + } + }); + + $effect(() => { + // Update NodeListRows metrics only: Keep ordered nodes on page 1 hostnameFilter, hoststateFilter // Continous Scroll: Paging if parameters change: Existing entries will not match new selections - // Nodes Array Reset in HandleNodes func + nodes = []; if (!usePaging) { page = 1; } @@ -228,7 +237,7 @@ style="padding-top: {headerPaddingTop}px;" > {cluster} Node Info - {#if $nodesQuery.fetching} + {#if $nodesStore.fetching} {/if} @@ -245,22 +254,24 @@ - {#if $nodesQuery.error} + {#if $nodesStore.error} - {$nodesQuery.error.message} + {$nodesStore.error.message} {:else} {#each nodes as nodeData (nodeData.host)} - + {:else} - - No nodes found - + {#if !$nodesStore.fetching} + + No nodes found + + {/if} {/each} {/if} - {#if $nodesQuery.fetching || !$nodesQuery.data} + {#if $nodesStore.fetching || !$nodesStore.data}
diff --git a/web/frontend/src/systems/nodelist/NodeInfo.svelte b/web/frontend/src/systems/nodelist/NodeInfo.svelte index 39716ca2..4b616f10 100644 --- a/web/frontend/src/systems/nodelist/NodeInfo.svelte +++ b/web/frontend/src/systems/nodelist/NodeInfo.svelte @@ -51,6 +51,8 @@ /* Derived */ // Not at least one returned, selected metric: NodeHealth warning + const fetchInfo = $derived(dataHealth.includes('fetching')); + // Not at least one returned, selected metric: NodeHealth warning const healthWarn = $derived(!dataHealth.includes(true)); // At least one non-returned selected metric: Metric config error? const metricWarn = $derived(dataHealth.includes(false)); @@ -84,10 +86,17 @@ - {#if healthWarn} + {#if fetchInfo} + + + + + {:else if healthWarn} - Jobs + Info