From c4499965590879044596ede8a4897059085fa3d2 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 18 Mar 2026 09:43:41 +0100 Subject: [PATCH 1/8] Add context to log message Entire-Checkpoint: 55d95cdef0d4 --- internal/repository/stats.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/repository/stats.go b/internal/repository/stats.go index 18f3e120..8c0ebc57 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -360,7 +360,7 @@ func (r *JobRepository) JobsStats( var jobs, users, walltime, nodes, nodeHours, cores, coreHours, accs, accHours, runningJobs, shortJobs sql.NullInt64 if err := row.Scan(&jobs, &users, &walltime, &nodes, &nodeHours, &cores, &coreHours, &accs, &accHours, &runningJobs, &shortJobs); err != nil { - cclog.Warn("Error while scanning rows") + cclog.Warnf("Error scanning job statistics row: %v", err) return nil, err } From 8f10eba771710f07c5d77ec404181e22aab1099b Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 18 Mar 2026 10:05:09 +0100 Subject: [PATCH 2/8] Extend CLAUDE.md Entire-Checkpoint: 17cdf997acff --- CLAUDE.md | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/CLAUDE.md b/CLAUDE.md index ef030cd5..20260cbc 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -229,6 +229,7 @@ The backend supports a NATS-based API as an alternative to the REST API for job ### Setup 1. Configure NATS client connection in `config.json`: + ```json { "nats": { @@ -240,6 +241,7 @@ The backend supports a NATS-based API as an alternative to the REST API for job ``` 2. Configure API subjects in `config.json` under `main`: + ```json { "main": { @@ -252,6 +254,7 @@ The backend supports a NATS-based API as an alternative to the REST API for job } } ``` + - `subject-job-event` (required): NATS subject for job start/stop events - `subject-node-state` (required): NATS subject for node state updates - `job-concurrency` (optional, default: 8): Number of concurrent worker goroutines for job events @@ -264,19 +267,23 @@ Messages use **InfluxDB line protocol** format with the following structure: #### Job Events **Start Job:** + ``` job,function=start_job event="{\"jobId\":123,\"user\":\"alice\",\"cluster\":\"test\", ...}" 1234567890000000000 ``` **Stop Job:** + ``` job,function=stop_job event="{\"jobId\":123,\"cluster\":\"test\",\"startTime\":1234567890,\"stopTime\":1234571490,\"jobState\":\"completed\"}" 1234571490000000000 ``` **Tags:** + - `function`: Either `start_job` or `stop_job` **Fields:** + - `event`: JSON payload containing job data (see REST API documentation for schema) #### Node State Updates @@ -307,6 +314,28 @@ job,function=stop_job event="{\"jobId\":123,\"cluster\":\"test\",\"startTime\":1 - Messages are logged; no responses are sent back to publishers - If NATS client is unavailable, API subscriptions are skipped (logged as warning) +## Development Guidelines + +### Performance + +This application processes large volumes of HPC monitoring data (metrics, job +records, archives) at scale. All code changes must prioritize maximum throughput +and minimal latency. Avoid unnecessary allocations, prefer streaming over +buffering, and be mindful of lock contention. When in doubt, benchmark. + +### Change Impact Analysis + +For any significant change, you MUST: + +1. **Check all call paths**: Trace every caller of modified functions to ensure + correctness is preserved throughout the call chain. +2. **Evaluate side effects**: Identify and verify all side effects — database + writes, cache invalidations, channel sends, goroutine lifecycle changes, file + I/O, and external API calls. +3. **Consider concurrency implications**: This codebase uses goroutines and + channels extensively. Verify that changes do not introduce races, deadlocks, + or contention bottlenecks. + ## Dependencies - Go 1.24.0+ (check go.mod for exact version) From 3328d2ca115aeb312b7ad62c6dc83b2169c82f21 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 18 Mar 2026 10:37:32 +0100 Subject: [PATCH 3/8] Update go version in CLAUDE.md --- CLAUDE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CLAUDE.md b/CLAUDE.md index 20260cbc..061d056b 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -338,7 +338,7 @@ For any significant change, you MUST: ## Dependencies -- Go 1.24.0+ (check go.mod for exact version) +- Go 1.25.0+ (check go.mod for exact version) - Node.js (for frontend builds) - SQLite 3 (only supported database) - Optional: NATS server for NATS API integration From c1d51959d5f7bcef97510072154cb5a6b3565698 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 18 Mar 2026 10:57:06 +0100 Subject: [PATCH 4/8] Change dtermineState to enforce priority order Make exception if node is idle + down, then final state is idle Entire-Checkpoint: 92c797737df8 --- internal/api/node.go | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/internal/api/node.go b/internal/api/node.go index 06787cd1..c6727866 100644 --- a/internal/api/node.go +++ b/internal/api/node.go @@ -34,21 +34,26 @@ func metricListToNames(metricList map[string]*schema.Metric) []string { return names } -// this routine assumes that only one of them exists per node +// determineState resolves multiple states to a single state using priority order: +// allocated > reserved > idle > down > mixed. +// Exception: if both idle and down are present, idle is returned. func determineState(states []string) schema.SchedulerState { - for _, state := range states { - switch strings.ToLower(state) { - case "allocated": - return schema.NodeStateAllocated - case "reserved": - return schema.NodeStateReserved - case "idle": - return schema.NodeStateIdle - case "down": - return schema.NodeStateDown - case "mixed": - return schema.NodeStateMixed - } + stateSet := make(map[string]bool, len(states)) + for _, s := range states { + stateSet[strings.ToLower(s)] = true + } + + switch { + case stateSet["allocated"]: + return schema.NodeStateAllocated + case stateSet["reserved"]: + return schema.NodeStateReserved + case stateSet["idle"]: + return schema.NodeStateIdle + case stateSet["down"]: + return schema.NodeStateDown + case stateSet["mixed"]: + return schema.NodeStateMixed } return schema.NodeStateUnknown From 33beb3c8066743a43341b30c591d7ec1e5f46412 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Wed, 18 Mar 2026 11:07:36 +0100 Subject: [PATCH 5/8] fix: simplify stats query condition - caused expensive subquery without need in frontend --- internal/repository/stats.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/repository/stats.go b/internal/repository/stats.go index 8c0ebc57..0bc85109 100644 --- a/internal/repository/stats.go +++ b/internal/repository/stats.go @@ -156,7 +156,7 @@ func (r *JobRepository) buildStatsQuery( columns = append(columns, "COUNT(*) as totalJobs") - if need("totalUsers") && col != "job.hpc_user" { + if need("totalUsers") { columns = append(columns, "COUNT(DISTINCT job.hpc_user) AS totalUsers") } else { columns = append(columns, "0 AS totalUsers") From 8b0881fb177622f076dfa6b5b9cecce31c8e9860 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 18 Mar 2026 11:20:10 +0100 Subject: [PATCH 6/8] Exclude down nodes from HealthCheck Entire-Checkpoint: 0c3347168c79 --- internal/api/nats.go | 26 ++++++++++++++++++++------ internal/api/node.go | 32 ++++++++++++++++++++++++-------- 2 files changed, 44 insertions(+), 14 deletions(-) diff --git a/internal/api/nats.go b/internal/api/nats.go index db229a04..db33c0e2 100644 --- a/internal/api/nats.go +++ b/internal/api/nats.go @@ -402,12 +402,21 @@ func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) { repo := repository.GetNodeRepository() requestReceived := time.Now().Unix() - // Build nodeList per subcluster for health check + // Pre-compute node states; only include non-down nodes in health check + nodeStates := make(map[string]schema.SchedulerState, len(req.Nodes)) + for _, node := range req.Nodes { + nodeStates[node.Hostname] = determineState(node.States) + } + + // Build nodeList per subcluster for health check, skipping down nodes m := make(map[string][]string) metricNames := make(map[string][]string) healthResults := make(map[string]metricstore.HealthCheckResult) for _, node := range req.Nodes { + if nodeStates[node.Hostname] == schema.NodeStateDown { + continue + } if sc, err := archive.GetSubClusterByNode(req.Cluster, node.Hostname); err == nil { m[sc] = append(m[sc], node.Hostname) } @@ -436,12 +445,17 @@ func (api *NatsAPI) processNodestateEvent(msg lp.CCMessage) { updates := make([]repository.NodeStateUpdate, 0, len(req.Nodes)) for _, node := range req.Nodes { - state := determineState(node.States) - healthState := schema.MonitoringStateFailed + state := nodeStates[node.Hostname] + var healthState schema.MonitoringState var healthMetrics string - if result, ok := healthResults[node.Hostname]; ok { - healthState = result.State - healthMetrics = result.HealthMetrics + if state == schema.NodeStateDown { + healthState = schema.MonitoringStateFull + } else { + healthState = schema.MonitoringStateFailed + if result, ok := healthResults[node.Hostname]; ok { + healthState = result.State + healthMetrics = result.HealthMetrics + } } nodeState := schema.NodeStateDB{ TimeStamp: requestReceived, diff --git a/internal/api/node.go b/internal/api/node.go index c6727866..2b86fbad 100644 --- a/internal/api/node.go +++ b/internal/api/node.go @@ -36,7 +36,7 @@ func metricListToNames(metricList map[string]*schema.Metric) []string { // determineState resolves multiple states to a single state using priority order: // allocated > reserved > idle > down > mixed. -// Exception: if both idle and down are present, idle is returned. +// Exception: if both idle and down are present, down is returned. func determineState(states []string) schema.SchedulerState { stateSet := make(map[string]bool, len(states)) for _, s := range states { @@ -48,6 +48,8 @@ func determineState(states []string) schema.SchedulerState { return schema.NodeStateAllocated case stateSet["reserved"]: return schema.NodeStateReserved + case stateSet["idle"] && stateSet["down"]: + return schema.NodeStateDown case stateSet["idle"]: return schema.NodeStateIdle case stateSet["down"]: @@ -84,14 +86,23 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { requestReceived := time.Now().Unix() repo := repository.GetNodeRepository() + // Step 1: Pre-compute node states; only include non-down nodes in health check + nodeStates := make(map[string]schema.SchedulerState, len(req.Nodes)) + for _, node := range req.Nodes { + nodeStates[node.Hostname] = determineState(node.States) + } + m := make(map[string][]string) metricNames := make(map[string][]string) healthResults := make(map[string]metricstore.HealthCheckResult) startMs := time.Now() - // Step 1: Build nodeList and metricList per subcluster + // Step 2: Build nodeList and metricList per subcluster, skipping down nodes for _, node := range req.Nodes { + if nodeStates[node.Hostname] == schema.NodeStateDown { + continue + } if sc, err := archive.GetSubClusterByNode(req.Cluster, node.Hostname); err == nil { m[sc] = append(m[sc], node.Hostname) } @@ -104,7 +115,7 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { } } - // Step 2: Determine which metric store to query and perform health check + // Step 3: Determine which metric store to query and perform health check healthRepo, err := metricdispatch.GetHealthCheckRepo(req.Cluster) if err != nil { cclog.Warnf("updateNodeStates: no metric store for cluster %s, skipping health check: %v", req.Cluster, err) @@ -123,12 +134,17 @@ func (api *RestAPI) updateNodeStates(rw http.ResponseWriter, r *http.Request) { updates := make([]repository.NodeStateUpdate, 0, len(req.Nodes)) for _, node := range req.Nodes { - state := determineState(node.States) - healthState := schema.MonitoringStateFailed + state := nodeStates[node.Hostname] + var healthState schema.MonitoringState var healthMetrics string - if result, ok := healthResults[node.Hostname]; ok { - healthState = result.State - healthMetrics = result.HealthMetrics + if state == schema.NodeStateDown { + healthState = schema.MonitoringStateFull + } else { + healthState = schema.MonitoringStateFailed + if result, ok := healthResults[node.Hostname]; ok { + healthState = result.State + healthMetrics = result.HealthMetrics + } } nodeState := schema.NodeStateDB{ TimeStamp: requestReceived, From bb6915771d829e2e503f507bb1081227ab1ed019 Mon Sep 17 00:00:00 2001 From: Christoph Kluge Date: Wed, 18 Mar 2026 13:23:21 +0100 Subject: [PATCH 7/8] fix: clarify title --- web/frontend/src/status/dashdetails/HealthDash.svelte | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/web/frontend/src/status/dashdetails/HealthDash.svelte b/web/frontend/src/status/dashdetails/HealthDash.svelte index a30552b1..94965510 100644 --- a/web/frontend/src/status/dashdetails/HealthDash.svelte +++ b/web/frontend/src/status/dashdetails/HealthDash.svelte @@ -161,7 +161,7 @@
- + {#if $statusQuery?.fetching} @@ -222,7 +222,7 @@
{#key refinedHealthData}

- Current {cluster.charAt(0).toUpperCase() + cluster.slice(1)} Node Health + Current {cluster.charAt(0).toUpperCase() + cluster.slice(1)} Metric Health

Date: Wed, 18 Mar 2026 17:32:16 +0100 Subject: [PATCH 8/8] fix: reduce memory usage in parquet checkpoint archiver Stream CheckpointFile trees directly to parquet rows instead of materializing all rows in a giant intermediate slice. This eliminates ~1.9GB per host of redundant allocations (repeated string headers) and removes the expensive sort on millions of 104-byte structs. Key changes: - Replace flattenCheckpointFile + sortParquetRows + WriteHostRows with streaming WriteCheckpointFile that walks the tree with sorted keys - Reduce results channel buffer from len(hostEntries) to 2 for back-pressure (at most NumWorkers+2 results in flight) - Workers send CheckpointFile trees instead of []ParquetMetricRow - Write rows in small 1024-element batches via reusable buffer Co-Authored-By: Claude Opus 4.6 Entire-Checkpoint: f31dc1847539 --- pkg/metricstore/archive.go | 40 ++++-- pkg/metricstore/parquetArchive.go | 176 +++++++++++-------------- pkg/metricstore/parquetArchive_test.go | 59 +++++++-- 3 files changed, 155 insertions(+), 120 deletions(-) diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index deebc869..7eb1b72f 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -168,8 +168,9 @@ func deleteCheckpoints(checkpointsDir string, from int64) (int, error) { // archiveCheckpoints archives checkpoint files to Parquet format. // Produces one Parquet file per cluster: //.parquet -// Each host's rows are written as a separate row group to avoid accumulating -// all data in memory at once. +// Workers load checkpoint files from disk and send CheckpointFile trees on a +// back-pressured channel. The main thread streams each tree directly to Parquet +// rows without materializing all rows in memory. func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, error) { cclog.Info("[METRICSTORE]> start archiving checkpoints to parquet") startTime := time.Now() @@ -192,14 +193,16 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err return totalFiles, err } - // Stream per-host rows to parquet writer via worker pool + // Workers load checkpoint files from disk; main thread writes to parquet. type hostResult struct { - rows []ParquetMetricRow - files []string // checkpoint filenames to delete after successful write - dir string // checkpoint directory for this host + checkpoints []*CheckpointFile + hostname string + files []string // checkpoint filenames to delete after successful write + dir string // checkpoint directory for this host } - results := make(chan hostResult, len(hostEntries)) + // Small buffer provides back-pressure: at most NumWorkers+2 results in flight. + results := make(chan hostResult, 2) work := make(chan struct { dir, host string }, Keys.NumWorkers) @@ -212,14 +215,19 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err go func() { defer wg.Done() for item := range work { - rows, files, err := archiveCheckpointsToParquet(item.dir, cluster, item.host, from) + checkpoints, files, err := loadCheckpointFiles(item.dir, from) if err != nil { cclog.Errorf("[METRICSTORE]> error reading checkpoints for %s/%s: %s", cluster, item.host, err.Error()) atomic.AddInt32(&errs, 1) continue } - if len(rows) > 0 { - results <- hostResult{rows: rows, files: files, dir: item.dir} + if len(checkpoints) > 0 { + results <- hostResult{ + checkpoints: checkpoints, + hostname: item.host, + files: files, + dir: item.dir, + } } } }() @@ -240,7 +248,7 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err close(results) }() - // Open streaming writer and write each host's rows as a row group + // Open streaming writer and write each host's checkpoint files as a row group parquetFile := filepath.Join(cleanupDir, cluster, fmt.Sprintf("%d.parquet", from)) writer, err := newParquetArchiveWriter(parquetFile) if err != nil { @@ -259,9 +267,13 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err for r := range results { if writeErr == nil { - sortParquetRows(r.rows) - if err := writer.WriteHostRows(r.rows); err != nil { - writeErr = err + // Stream each checkpoint file directly to parquet rows. + // Each checkpoint is processed and discarded before the next. + for _, cf := range r.checkpoints { + if err := writer.WriteCheckpointFile(cf, cluster, r.hostname, "node", ""); err != nil { + writeErr = err + break + } } } // Always track files for deletion (even if write failed, we still drain) diff --git a/pkg/metricstore/parquetArchive.go b/pkg/metricstore/parquetArchive.go index 18ee2c64..260bd8dd 100644 --- a/pkg/metricstore/parquetArchive.go +++ b/pkg/metricstore/parquetArchive.go @@ -14,7 +14,6 @@ import ( "path/filepath" "sort" - cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" pq "github.com/parquet-go/parquet-go" ) @@ -32,37 +31,6 @@ type ParquetMetricRow struct { Value float32 `parquet:"value"` } -// flattenCheckpointFile recursively converts a CheckpointFile tree into Parquet rows. -// The scope path is built from the hierarchy: host level is "node", then child names -// map to scope/scope_id (e.g., "socket0" → scope="socket", scope_id="0"). -func flattenCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string, rows []ParquetMetricRow) []ParquetMetricRow { - for metricName, cm := range cf.Metrics { - ts := cm.Start - for _, v := range cm.Data { - if !v.IsNaN() { - rows = append(rows, ParquetMetricRow{ - Cluster: cluster, - Hostname: hostname, - Metric: metricName, - Scope: scope, - ScopeID: scopeID, - Timestamp: ts, - Frequency: cm.Frequency, - Value: float32(v), - }) - } - ts += cm.Frequency - } - } - - for childName, childCf := range cf.Children { - childScope, childScopeID := parseScopeFromName(childName) - rows = flattenCheckpointFile(childCf, cluster, hostname, childScope, childScopeID, rows) - } - - return rows -} - // parseScopeFromName infers scope and scope_id from a child level name. // Examples: "socket0" → ("socket", "0"), "core12" → ("core", "12"), // "a0" (accelerator) → ("accelerator", "0"). @@ -93,15 +61,17 @@ func parseScopeFromName(name string) (string, string) { } // parquetArchiveWriter supports incremental writes to a Parquet file. -// Each call to WriteHostRows writes one row group (typically one host's data), -// avoiding accumulation of all rows in memory. +// Uses streaming writes to avoid accumulating all rows in memory. type parquetArchiveWriter struct { writer *pq.GenericWriter[ParquetMetricRow] bw *bufio.Writer f *os.File + batch []ParquetMetricRow // reusable batch buffer count int } +const parquetBatchSize = 1024 + // newParquetArchiveWriter creates a streaming Parquet writer with Zstd compression. func newParquetArchiveWriter(filename string) (*parquetArchiveWriter, error) { if err := os.MkdirAll(filepath.Dir(filename), CheckpointDirPerms); err != nil { @@ -119,31 +89,85 @@ func newParquetArchiveWriter(filename string) (*parquetArchiveWriter, error) { pq.Compression(&pq.Zstd), ) - return &parquetArchiveWriter{writer: writer, bw: bw, f: f}, nil + return &parquetArchiveWriter{ + writer: writer, + bw: bw, + f: f, + batch: make([]ParquetMetricRow, 0, parquetBatchSize), + }, nil } -// WriteHostRows sorts rows by (metric, timestamp) in-place, writes them, -// and flushes to create a separate row group. -func (w *parquetArchiveWriter) WriteHostRows(rows []ParquetMetricRow) error { - sort.Slice(rows, func(i, j int) bool { - if rows[i].Metric != rows[j].Metric { - return rows[i].Metric < rows[j].Metric - } - return rows[i].Timestamp < rows[j].Timestamp - }) +// WriteCheckpointFile streams a CheckpointFile tree directly to Parquet rows, +// writing metrics in sorted order without materializing all rows in memory. +// Produces one row group per call (typically one host's data). +func (w *parquetArchiveWriter) WriteCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string) error { + w.writeLevel(cf, cluster, hostname, scope, scopeID) - if _, err := w.writer.Write(rows); err != nil { - return fmt.Errorf("writing parquet rows: %w", err) + // Flush remaining batch + if len(w.batch) > 0 { + if _, err := w.writer.Write(w.batch); err != nil { + return fmt.Errorf("writing parquet rows: %w", err) + } + w.count += len(w.batch) + w.batch = w.batch[:0] } if err := w.writer.Flush(); err != nil { return fmt.Errorf("flushing parquet row group: %w", err) } - w.count += len(rows) return nil } +// writeLevel recursively writes metrics from a CheckpointFile level. +// Metric names and child names are sorted for deterministic, compression-friendly output. +func (w *parquetArchiveWriter) writeLevel(cf *CheckpointFile, cluster, hostname, scope, scopeID string) { + // Sort metric names for deterministic order + metricNames := make([]string, 0, len(cf.Metrics)) + for name := range cf.Metrics { + metricNames = append(metricNames, name) + } + sort.Strings(metricNames) + + for _, metricName := range metricNames { + cm := cf.Metrics[metricName] + ts := cm.Start + for _, v := range cm.Data { + if !v.IsNaN() { + w.batch = append(w.batch, ParquetMetricRow{ + Cluster: cluster, + Hostname: hostname, + Metric: metricName, + Scope: scope, + ScopeID: scopeID, + Timestamp: ts, + Frequency: cm.Frequency, + Value: float32(v), + }) + + if len(w.batch) >= parquetBatchSize { + w.writer.Write(w.batch) + w.count += len(w.batch) + w.batch = w.batch[:0] + } + } + ts += cm.Frequency + } + } + + // Sort child names for deterministic order + childNames := make([]string, 0, len(cf.Children)) + for name := range cf.Children { + childNames = append(childNames, name) + } + sort.Strings(childNames) + + for _, childName := range childNames { + childScope, childScopeID := parseScopeFromName(childName) + w.writeLevel(cf.Children[childName], cluster, hostname, childScope, childScopeID) + } +} + // Close finalises the Parquet file (footer, buffered I/O, file handle). func (w *parquetArchiveWriter) Close() error { if err := w.writer.Close(); err != nil { @@ -159,16 +183,6 @@ func (w *parquetArchiveWriter) Close() error { return w.f.Close() } -// sortParquetRows sorts rows by (metric, timestamp) in-place. -func sortParquetRows(rows []ParquetMetricRow) { - sort.Slice(rows, func(i, j int) bool { - if rows[i].Metric != rows[j].Metric { - return rows[i].Metric < rows[j].Metric - } - return rows[i].Timestamp < rows[j].Timestamp - }) -} - // loadCheckpointFileFromDisk reads a JSON or binary checkpoint file and returns // a CheckpointFile. Used by the Parquet archiver to read checkpoint data // before converting it to Parquet format. @@ -218,22 +232,10 @@ func loadCheckpointFileFromDisk(filename string) (*CheckpointFile, error) { } } -// estimateRowCount estimates the number of Parquet rows a CheckpointFile will produce. -// Used for pre-allocating the rows slice to avoid repeated append doubling. -func estimateRowCount(cf *CheckpointFile) int { - n := 0 - for _, cm := range cf.Metrics { - n += len(cm.Data) - } - for _, child := range cf.Children { - n += estimateRowCount(child) - } - return n -} - -// archiveCheckpointsToParquet reads checkpoint files for a host directory, -// converts them to Parquet rows. Returns the rows and filenames that were processed. -func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]ParquetMetricRow, []string, error) { +// loadCheckpointFiles reads checkpoint files for a host directory and returns +// the loaded CheckpointFiles and their filenames. Processes one file at a time +// to avoid holding all checkpoint data in memory simultaneously. +func loadCheckpointFiles(dir string, from int64) ([]*CheckpointFile, []string, error) { entries, err := os.ReadDir(dir) if err != nil { return nil, nil, err @@ -248,36 +250,18 @@ func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]Parqu return nil, nil, nil } - // First pass: load checkpoints and estimate total rows for pre-allocation. - type loaded struct { - cf *CheckpointFile - filename string - } - var checkpoints []loaded - totalEstimate := 0 + var checkpoints []*CheckpointFile + var processedFiles []string for _, checkpoint := range files { filename := filepath.Join(dir, checkpoint) cf, err := loadCheckpointFileFromDisk(filename) if err != nil { - cclog.Warnf("[METRICSTORE]> skipping unreadable checkpoint %s: %v", filename, err) continue } - totalEstimate += estimateRowCount(cf) - checkpoints = append(checkpoints, loaded{cf: cf, filename: checkpoint}) + checkpoints = append(checkpoints, cf) + processedFiles = append(processedFiles, checkpoint) } - if len(checkpoints) == 0 { - return nil, nil, nil - } - - rows := make([]ParquetMetricRow, 0, totalEstimate) - processedFiles := make([]string, 0, len(checkpoints)) - - for _, cp := range checkpoints { - rows = flattenCheckpointFile(cp.cf, cluster, host, "node", "", rows) - processedFiles = append(processedFiles, cp.filename) - } - - return rows, processedFiles, nil + return checkpoints, processedFiles, nil } diff --git a/pkg/metricstore/parquetArchive_test.go b/pkg/metricstore/parquetArchive_test.go index e10b0d03..6295c1c7 100644 --- a/pkg/metricstore/parquetArchive_test.go +++ b/pkg/metricstore/parquetArchive_test.go @@ -44,7 +44,7 @@ func TestParseScopeFromName(t *testing.T) { } } -func TestFlattenCheckpointFile(t *testing.T) { +func TestWriteCheckpointFile(t *testing.T) { cf := &CheckpointFile{ From: 1000, To: 1060, @@ -69,17 +69,55 @@ func TestFlattenCheckpointFile(t *testing.T) { }, } - rows := flattenCheckpointFile(cf, "fritz", "node001", "node", "", nil) + tmpDir := t.TempDir() + parquetFile := filepath.Join(tmpDir, "test.parquet") + writer, err := newParquetArchiveWriter(parquetFile) + if err != nil { + t.Fatal(err) + } + + if err := writer.WriteCheckpointFile(cf, "fritz", "node001", "node", ""); err != nil { + t.Fatal(err) + } + if err := writer.Close(); err != nil { + t.Fatal(err) + } // cpu_load: 2 non-NaN values at node scope // mem_bw: 2 non-NaN values at socket0 scope - if len(rows) != 4 { - t.Fatalf("expected 4 rows, got %d", len(rows)) + if writer.count != 4 { + t.Fatalf("expected 4 rows written, got %d", writer.count) + } + + // Read back and verify + f, err := os.Open(parquetFile) + if err != nil { + t.Fatal(err) + } + defer f.Close() + + stat, _ := f.Stat() + pf, err := pq.OpenFile(f, stat.Size()) + if err != nil { + t.Fatal(err) + } + + reader := pq.NewGenericReader[ParquetMetricRow](pf) + readRows := make([]ParquetMetricRow, 100) + n, err := reader.Read(readRows) + if err != nil && n == 0 { + t.Fatal(err) + } + readRows = readRows[:n] + reader.Close() + + if n != 4 { + t.Fatalf("expected 4 rows, got %d", n) } // Verify a node-scope row found := false - for _, r := range rows { + for _, r := range readRows { if r.Metric == "cpu_load" && r.Timestamp == 1000 { found = true if r.Cluster != "fritz" || r.Hostname != "node001" || r.Scope != "node" || r.Value != 0.5 { @@ -93,7 +131,7 @@ func TestFlattenCheckpointFile(t *testing.T) { // Verify a socket-scope row found = false - for _, r := range rows { + for _, r := range readRows { if r.Metric == "mem_bw" && r.Scope == "socket" && r.ScopeID == "0" { found = true } @@ -153,7 +191,7 @@ func TestParquetArchiveRoundtrip(t *testing.T) { // Archive to Parquet archiveDir := filepath.Join(tmpDir, "archive") - rows, files, err := archiveCheckpointsToParquet(cpDir, "testcluster", "node001", 2000) + checkpoints, files, err := loadCheckpointFiles(cpDir, 2000) if err != nil { t.Fatal(err) } @@ -166,9 +204,10 @@ func TestParquetArchiveRoundtrip(t *testing.T) { if err != nil { t.Fatal(err) } - sortParquetRows(rows) - if err := writer.WriteHostRows(rows); err != nil { - t.Fatal(err) + for _, cp := range checkpoints { + if err := writer.WriteCheckpointFile(cp, "testcluster", "node001", "node", ""); err != nil { + t.Fatal(err) + } } if err := writer.Close(); err != nil { t.Fatal(err)