Implement the following plan: # Fix: Memory Escalation in flattenCheckpointFile (68GB+) ## Context Production gops shows `flattenCheckpointFile` allocating 68GB+ (74.89% of memory). The archiving pipeline accumulates ALL metric data from ALL hosts into a single `[]ParquetMetricRow` slice before writing to Parquet. For large HPC clusters this is catastrophic. Additionally, the `SortingWriterConfig` in the parquet writer buffers everything again internally. ## Root Cause Two-layer unbounded accumulation: 1. **`archive.go:239-242`**: `allRows = append(allRows, r.rows...)` merges every host's rows into one giant slice 2. **`parquetArchive.go:108-116`**: `SortingWriterConfig` creates a sorting writer that buffers ALL rows until `Close()` 3. **`parquetArchive.go:199`**: `var rows []ParquetMetricRow` starts at zero capacity, grows via append doubling Peak memory = (all hosts' rows) + (sorting writer copy) + (append overhead) = ~3x raw data size. ## Fix: Stream per-host to parquet writer Instead of accumulating all rows, write each host's data as a separate row group. ### Step 1: Add streaming parquet writer (`parquetArchive.go`) Replace `writeParquetArchive(filename, rows)` with a struct that supports incremental writes: ```go type parquetArchiveWriter struct { writer *pq.GenericWriter[ParquetMetricRow] bw *bufio.Writer f *os.File count int } func newParquetArchiveWriter(filename string) (*parquetArchiveWriter, error) func (w *parquetArchiveWriter) WriteHostRows(rows []ParquetMetricRow) error // Write + Flush (creates row group) func (w *parquetArchiveWriter) Close() error ``` - **Remove `SortingWriterConfig`** - no global sort buffer - Sort each host's rows in-place with `sort.Slice` before writing (cheap: single host data) - Each `Flush()` creates a separate row group per host ### Step 2: Add row count estimation (`parquetArchive.go`) ```go func estimateRowCount(cf *CheckpointFile) int ``` Pre-allocate `rows` slice in `archiveCheckpointsToParquet` to avoid append doubling per host. ### Step 3: Restructure `archiveCheckpoints` (`archive.go`) Change from: ``` workers → channel → accumulate allRows → writeParquetArchive(allRows) ``` To: ``` open writer → workers → channel → for each host: sort rows, writer.WriteHostRows(rows) → close writer ``` - Only one host's rows in memory at a time - Track `files`/`dir` for deletion separately (don't retain rows) - Check `writer.count > 0` instead of `len(allRows) == 0` ### Step 4: Update test (`parquetArchive_test.go`) - `TestParquetArchiveRoundtrip`: use new streaming writer API - Keep `archiveCheckpointsToParquet` returning rows (it's per-host, manageable size) ## Files to Modify - **`pkg/metricstore/parquetArchive.go`**: Add `parquetArchiveWriter`, `estimateRowCount`; remove `writeParquetArchive`; add `"sort"` import - **`pkg/metricstore/archive.go`**: Restructure `archiveCheckpoints` to stream - **`pkg/metricstore/parquetArchive_test.go`**: Update roundtrip test ## Memory Impact - **Before**: All hosts in memory (~40GB for 256 nodes) + sorting buffer (~40GB) = 68GB+ - **After**: One host at a time (~16MB) + parquet page buffer (~1MB) = ~17MB peak ## Sorting Tradeoff The output changes from one globally-sorted row group to N row groups (one per host), each internally sorted by (metric, timestamp). This is actually better for ClusterCockpit's per-host query patterns (enables row group skipping). ## Verification ```bash go test -v ./pkg/metricstore/... ``` Also verify with `go vet ./pkg/metricstore/...` for correctness. If you need specific details from before exiting plan mode (like exact code snippets, error messages, or content you generated), read the full transcript at: /Users/jan/.claude/projects/-Users-jan-prg-CC-cc-backend/71340843-de3d-4e83-9dcb-2fc130c50e0d.jsonl --- Are the any other cases with memory spikes using the Parquet Writer, e.g. in the nodestate retention? --- [Request interrupted by user for tool use]