From 09501df3c223dc5ba36688c03c83284a9e8004a1 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 18 Mar 2026 17:32:16 +0100 Subject: [PATCH] fix: reduce memory usage in parquet checkpoint archiver Stream CheckpointFile trees directly to parquet rows instead of materializing all rows in a giant intermediate slice. This eliminates ~1.9GB per host of redundant allocations (repeated string headers) and removes the expensive sort on millions of 104-byte structs. Key changes: - Replace flattenCheckpointFile + sortParquetRows + WriteHostRows with streaming WriteCheckpointFile that walks the tree with sorted keys - Reduce results channel buffer from len(hostEntries) to 2 for back-pressure (at most NumWorkers+2 results in flight) - Workers send CheckpointFile trees instead of []ParquetMetricRow - Write rows in small 1024-element batches via reusable buffer Co-Authored-By: Claude Opus 4.6 Entire-Checkpoint: f31dc1847539 --- pkg/metricstore/archive.go | 40 ++++-- pkg/metricstore/parquetArchive.go | 176 +++++++++++-------------- pkg/metricstore/parquetArchive_test.go | 59 +++++++-- 3 files changed, 155 insertions(+), 120 deletions(-) diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index deebc869..7eb1b72f 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -168,8 +168,9 @@ func deleteCheckpoints(checkpointsDir string, from int64) (int, error) { // archiveCheckpoints archives checkpoint files to Parquet format. // Produces one Parquet file per cluster: //.parquet -// Each host's rows are written as a separate row group to avoid accumulating -// all data in memory at once. +// Workers load checkpoint files from disk and send CheckpointFile trees on a +// back-pressured channel. The main thread streams each tree directly to Parquet +// rows without materializing all rows in memory. func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, error) { cclog.Info("[METRICSTORE]> start archiving checkpoints to parquet") startTime := time.Now() @@ -192,14 +193,16 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err return totalFiles, err } - // Stream per-host rows to parquet writer via worker pool + // Workers load checkpoint files from disk; main thread writes to parquet. type hostResult struct { - rows []ParquetMetricRow - files []string // checkpoint filenames to delete after successful write - dir string // checkpoint directory for this host + checkpoints []*CheckpointFile + hostname string + files []string // checkpoint filenames to delete after successful write + dir string // checkpoint directory for this host } - results := make(chan hostResult, len(hostEntries)) + // Small buffer provides back-pressure: at most NumWorkers+2 results in flight. + results := make(chan hostResult, 2) work := make(chan struct { dir, host string }, Keys.NumWorkers) @@ -212,14 +215,19 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err go func() { defer wg.Done() for item := range work { - rows, files, err := archiveCheckpointsToParquet(item.dir, cluster, item.host, from) + checkpoints, files, err := loadCheckpointFiles(item.dir, from) if err != nil { cclog.Errorf("[METRICSTORE]> error reading checkpoints for %s/%s: %s", cluster, item.host, err.Error()) atomic.AddInt32(&errs, 1) continue } - if len(rows) > 0 { - results <- hostResult{rows: rows, files: files, dir: item.dir} + if len(checkpoints) > 0 { + results <- hostResult{ + checkpoints: checkpoints, + hostname: item.host, + files: files, + dir: item.dir, + } } } }() @@ -240,7 +248,7 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err close(results) }() - // Open streaming writer and write each host's rows as a row group + // Open streaming writer and write each host's checkpoint files as a row group parquetFile := filepath.Join(cleanupDir, cluster, fmt.Sprintf("%d.parquet", from)) writer, err := newParquetArchiveWriter(parquetFile) if err != nil { @@ -259,9 +267,13 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err for r := range results { if writeErr == nil { - sortParquetRows(r.rows) - if err := writer.WriteHostRows(r.rows); err != nil { - writeErr = err + // Stream each checkpoint file directly to parquet rows. + // Each checkpoint is processed and discarded before the next. + for _, cf := range r.checkpoints { + if err := writer.WriteCheckpointFile(cf, cluster, r.hostname, "node", ""); err != nil { + writeErr = err + break + } } } // Always track files for deletion (even if write failed, we still drain) diff --git a/pkg/metricstore/parquetArchive.go b/pkg/metricstore/parquetArchive.go index 18ee2c64..260bd8dd 100644 --- a/pkg/metricstore/parquetArchive.go +++ b/pkg/metricstore/parquetArchive.go @@ -14,7 +14,6 @@ import ( "path/filepath" "sort" - cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" pq "github.com/parquet-go/parquet-go" ) @@ -32,37 +31,6 @@ type ParquetMetricRow struct { Value float32 `parquet:"value"` } -// flattenCheckpointFile recursively converts a CheckpointFile tree into Parquet rows. -// The scope path is built from the hierarchy: host level is "node", then child names -// map to scope/scope_id (e.g., "socket0" → scope="socket", scope_id="0"). -func flattenCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string, rows []ParquetMetricRow) []ParquetMetricRow { - for metricName, cm := range cf.Metrics { - ts := cm.Start - for _, v := range cm.Data { - if !v.IsNaN() { - rows = append(rows, ParquetMetricRow{ - Cluster: cluster, - Hostname: hostname, - Metric: metricName, - Scope: scope, - ScopeID: scopeID, - Timestamp: ts, - Frequency: cm.Frequency, - Value: float32(v), - }) - } - ts += cm.Frequency - } - } - - for childName, childCf := range cf.Children { - childScope, childScopeID := parseScopeFromName(childName) - rows = flattenCheckpointFile(childCf, cluster, hostname, childScope, childScopeID, rows) - } - - return rows -} - // parseScopeFromName infers scope and scope_id from a child level name. // Examples: "socket0" → ("socket", "0"), "core12" → ("core", "12"), // "a0" (accelerator) → ("accelerator", "0"). @@ -93,15 +61,17 @@ func parseScopeFromName(name string) (string, string) { } // parquetArchiveWriter supports incremental writes to a Parquet file. -// Each call to WriteHostRows writes one row group (typically one host's data), -// avoiding accumulation of all rows in memory. +// Uses streaming writes to avoid accumulating all rows in memory. type parquetArchiveWriter struct { writer *pq.GenericWriter[ParquetMetricRow] bw *bufio.Writer f *os.File + batch []ParquetMetricRow // reusable batch buffer count int } +const parquetBatchSize = 1024 + // newParquetArchiveWriter creates a streaming Parquet writer with Zstd compression. func newParquetArchiveWriter(filename string) (*parquetArchiveWriter, error) { if err := os.MkdirAll(filepath.Dir(filename), CheckpointDirPerms); err != nil { @@ -119,31 +89,85 @@ func newParquetArchiveWriter(filename string) (*parquetArchiveWriter, error) { pq.Compression(&pq.Zstd), ) - return &parquetArchiveWriter{writer: writer, bw: bw, f: f}, nil + return &parquetArchiveWriter{ + writer: writer, + bw: bw, + f: f, + batch: make([]ParquetMetricRow, 0, parquetBatchSize), + }, nil } -// WriteHostRows sorts rows by (metric, timestamp) in-place, writes them, -// and flushes to create a separate row group. -func (w *parquetArchiveWriter) WriteHostRows(rows []ParquetMetricRow) error { - sort.Slice(rows, func(i, j int) bool { - if rows[i].Metric != rows[j].Metric { - return rows[i].Metric < rows[j].Metric - } - return rows[i].Timestamp < rows[j].Timestamp - }) +// WriteCheckpointFile streams a CheckpointFile tree directly to Parquet rows, +// writing metrics in sorted order without materializing all rows in memory. +// Produces one row group per call (typically one host's data). +func (w *parquetArchiveWriter) WriteCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string) error { + w.writeLevel(cf, cluster, hostname, scope, scopeID) - if _, err := w.writer.Write(rows); err != nil { - return fmt.Errorf("writing parquet rows: %w", err) + // Flush remaining batch + if len(w.batch) > 0 { + if _, err := w.writer.Write(w.batch); err != nil { + return fmt.Errorf("writing parquet rows: %w", err) + } + w.count += len(w.batch) + w.batch = w.batch[:0] } if err := w.writer.Flush(); err != nil { return fmt.Errorf("flushing parquet row group: %w", err) } - w.count += len(rows) return nil } +// writeLevel recursively writes metrics from a CheckpointFile level. +// Metric names and child names are sorted for deterministic, compression-friendly output. +func (w *parquetArchiveWriter) writeLevel(cf *CheckpointFile, cluster, hostname, scope, scopeID string) { + // Sort metric names for deterministic order + metricNames := make([]string, 0, len(cf.Metrics)) + for name := range cf.Metrics { + metricNames = append(metricNames, name) + } + sort.Strings(metricNames) + + for _, metricName := range metricNames { + cm := cf.Metrics[metricName] + ts := cm.Start + for _, v := range cm.Data { + if !v.IsNaN() { + w.batch = append(w.batch, ParquetMetricRow{ + Cluster: cluster, + Hostname: hostname, + Metric: metricName, + Scope: scope, + ScopeID: scopeID, + Timestamp: ts, + Frequency: cm.Frequency, + Value: float32(v), + }) + + if len(w.batch) >= parquetBatchSize { + w.writer.Write(w.batch) + w.count += len(w.batch) + w.batch = w.batch[:0] + } + } + ts += cm.Frequency + } + } + + // Sort child names for deterministic order + childNames := make([]string, 0, len(cf.Children)) + for name := range cf.Children { + childNames = append(childNames, name) + } + sort.Strings(childNames) + + for _, childName := range childNames { + childScope, childScopeID := parseScopeFromName(childName) + w.writeLevel(cf.Children[childName], cluster, hostname, childScope, childScopeID) + } +} + // Close finalises the Parquet file (footer, buffered I/O, file handle). func (w *parquetArchiveWriter) Close() error { if err := w.writer.Close(); err != nil { @@ -159,16 +183,6 @@ func (w *parquetArchiveWriter) Close() error { return w.f.Close() } -// sortParquetRows sorts rows by (metric, timestamp) in-place. -func sortParquetRows(rows []ParquetMetricRow) { - sort.Slice(rows, func(i, j int) bool { - if rows[i].Metric != rows[j].Metric { - return rows[i].Metric < rows[j].Metric - } - return rows[i].Timestamp < rows[j].Timestamp - }) -} - // loadCheckpointFileFromDisk reads a JSON or binary checkpoint file and returns // a CheckpointFile. Used by the Parquet archiver to read checkpoint data // before converting it to Parquet format. @@ -218,22 +232,10 @@ func loadCheckpointFileFromDisk(filename string) (*CheckpointFile, error) { } } -// estimateRowCount estimates the number of Parquet rows a CheckpointFile will produce. -// Used for pre-allocating the rows slice to avoid repeated append doubling. -func estimateRowCount(cf *CheckpointFile) int { - n := 0 - for _, cm := range cf.Metrics { - n += len(cm.Data) - } - for _, child := range cf.Children { - n += estimateRowCount(child) - } - return n -} - -// archiveCheckpointsToParquet reads checkpoint files for a host directory, -// converts them to Parquet rows. Returns the rows and filenames that were processed. -func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]ParquetMetricRow, []string, error) { +// loadCheckpointFiles reads checkpoint files for a host directory and returns +// the loaded CheckpointFiles and their filenames. Processes one file at a time +// to avoid holding all checkpoint data in memory simultaneously. +func loadCheckpointFiles(dir string, from int64) ([]*CheckpointFile, []string, error) { entries, err := os.ReadDir(dir) if err != nil { return nil, nil, err @@ -248,36 +250,18 @@ func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]Parqu return nil, nil, nil } - // First pass: load checkpoints and estimate total rows for pre-allocation. - type loaded struct { - cf *CheckpointFile - filename string - } - var checkpoints []loaded - totalEstimate := 0 + var checkpoints []*CheckpointFile + var processedFiles []string for _, checkpoint := range files { filename := filepath.Join(dir, checkpoint) cf, err := loadCheckpointFileFromDisk(filename) if err != nil { - cclog.Warnf("[METRICSTORE]> skipping unreadable checkpoint %s: %v", filename, err) continue } - totalEstimate += estimateRowCount(cf) - checkpoints = append(checkpoints, loaded{cf: cf, filename: checkpoint}) + checkpoints = append(checkpoints, cf) + processedFiles = append(processedFiles, checkpoint) } - if len(checkpoints) == 0 { - return nil, nil, nil - } - - rows := make([]ParquetMetricRow, 0, totalEstimate) - processedFiles := make([]string, 0, len(checkpoints)) - - for _, cp := range checkpoints { - rows = flattenCheckpointFile(cp.cf, cluster, host, "node", "", rows) - processedFiles = append(processedFiles, cp.filename) - } - - return rows, processedFiles, nil + return checkpoints, processedFiles, nil } diff --git a/pkg/metricstore/parquetArchive_test.go b/pkg/metricstore/parquetArchive_test.go index e10b0d03..6295c1c7 100644 --- a/pkg/metricstore/parquetArchive_test.go +++ b/pkg/metricstore/parquetArchive_test.go @@ -44,7 +44,7 @@ func TestParseScopeFromName(t *testing.T) { } } -func TestFlattenCheckpointFile(t *testing.T) { +func TestWriteCheckpointFile(t *testing.T) { cf := &CheckpointFile{ From: 1000, To: 1060, @@ -69,17 +69,55 @@ func TestFlattenCheckpointFile(t *testing.T) { }, } - rows := flattenCheckpointFile(cf, "fritz", "node001", "node", "", nil) + tmpDir := t.TempDir() + parquetFile := filepath.Join(tmpDir, "test.parquet") + writer, err := newParquetArchiveWriter(parquetFile) + if err != nil { + t.Fatal(err) + } + + if err := writer.WriteCheckpointFile(cf, "fritz", "node001", "node", ""); err != nil { + t.Fatal(err) + } + if err := writer.Close(); err != nil { + t.Fatal(err) + } // cpu_load: 2 non-NaN values at node scope // mem_bw: 2 non-NaN values at socket0 scope - if len(rows) != 4 { - t.Fatalf("expected 4 rows, got %d", len(rows)) + if writer.count != 4 { + t.Fatalf("expected 4 rows written, got %d", writer.count) + } + + // Read back and verify + f, err := os.Open(parquetFile) + if err != nil { + t.Fatal(err) + } + defer f.Close() + + stat, _ := f.Stat() + pf, err := pq.OpenFile(f, stat.Size()) + if err != nil { + t.Fatal(err) + } + + reader := pq.NewGenericReader[ParquetMetricRow](pf) + readRows := make([]ParquetMetricRow, 100) + n, err := reader.Read(readRows) + if err != nil && n == 0 { + t.Fatal(err) + } + readRows = readRows[:n] + reader.Close() + + if n != 4 { + t.Fatalf("expected 4 rows, got %d", n) } // Verify a node-scope row found := false - for _, r := range rows { + for _, r := range readRows { if r.Metric == "cpu_load" && r.Timestamp == 1000 { found = true if r.Cluster != "fritz" || r.Hostname != "node001" || r.Scope != "node" || r.Value != 0.5 { @@ -93,7 +131,7 @@ func TestFlattenCheckpointFile(t *testing.T) { // Verify a socket-scope row found = false - for _, r := range rows { + for _, r := range readRows { if r.Metric == "mem_bw" && r.Scope == "socket" && r.ScopeID == "0" { found = true } @@ -153,7 +191,7 @@ func TestParquetArchiveRoundtrip(t *testing.T) { // Archive to Parquet archiveDir := filepath.Join(tmpDir, "archive") - rows, files, err := archiveCheckpointsToParquet(cpDir, "testcluster", "node001", 2000) + checkpoints, files, err := loadCheckpointFiles(cpDir, 2000) if err != nil { t.Fatal(err) } @@ -166,9 +204,10 @@ func TestParquetArchiveRoundtrip(t *testing.T) { if err != nil { t.Fatal(err) } - sortParquetRows(rows) - if err := writer.WriteHostRows(rows); err != nil { - t.Fatal(err) + for _, cp := range checkpoints { + if err := writer.WriteCheckpointFile(cp, "testcluster", "node001", "node", ""); err != nil { + t.Fatal(err) + } } if err := writer.Close(); err != nil { t.Fatal(err)