From 02f82c2c0bb0bf3cffa7704f267c24878f517128 Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Wed, 18 Mar 2026 05:08:37 +0100 Subject: [PATCH] fix: Prevent memory spikes in parquet writer for metricstore move policy Entire-Checkpoint: 4a675b8352a2 --- pkg/metricstore/archive.go | 57 +++++++++---- pkg/metricstore/parquetArchive.go | 108 ++++++++++++++++++++----- pkg/metricstore/parquetArchive_test.go | 10 ++- 3 files changed, 140 insertions(+), 35 deletions(-) diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index 916736d0..5c1a8f0f 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -168,6 +168,8 @@ func deleteCheckpoints(checkpointsDir string, from int64) (int, error) { // archiveCheckpoints archives checkpoint files to Parquet format. // Produces one Parquet file per cluster: //.parquet +// Each host's rows are written as a separate row group to avoid accumulating +// all data in memory at once. func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, error) { clusterEntries, err := os.ReadDir(checkpointsDir) if err != nil { @@ -187,7 +189,7 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err return totalFiles, err } - // Collect rows from all hosts in this cluster using worker pool + // Stream per-host rows to parquet writer via worker pool type hostResult struct { rows []ParquetMetricRow files []string // checkpoint filenames to delete after successful write @@ -235,32 +237,57 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err close(results) }() - // Collect all rows and file info - var allRows []ParquetMetricRow - var allResults []hostResult + // Open streaming writer and write each host's rows as a row group + parquetFile := filepath.Join(cleanupDir, cluster, fmt.Sprintf("%d.parquet", from)) + writer, err := newParquetArchiveWriter(parquetFile) + if err != nil { + // Drain results channel to unblock workers + for range results { + } + return totalFiles, fmt.Errorf("creating parquet writer for cluster %s: %w", cluster, err) + } + + type deleteItem struct { + dir string + files []string + } + var toDelete []deleteItem + writeErr := error(nil) + for r := range results { - allRows = append(allRows, r.rows...) - allResults = append(allResults, r) + if writeErr == nil { + sortParquetRows(r.rows) + if err := writer.WriteHostRows(r.rows); err != nil { + writeErr = err + } + } + // Always track files for deletion (even if write failed, we still drain) + toDelete = append(toDelete, deleteItem{dir: r.dir, files: r.files}) + } + + if err := writer.Close(); err != nil && writeErr == nil { + writeErr = err } if errs > 0 { return totalFiles, fmt.Errorf("%d errors reading checkpoints for cluster %s", errs, cluster) } - if len(allRows) == 0 { + if writer.count == 0 { + // No data written — remove empty file + os.Remove(parquetFile) continue } - // Write one Parquet file per cluster - parquetFile := filepath.Join(cleanupDir, cluster, fmt.Sprintf("%d.parquet", from)) - if err := writeParquetArchive(parquetFile, allRows); err != nil { - return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, err) + if writeErr != nil { + os.Remove(parquetFile) + return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, writeErr) } // Delete archived checkpoint files - for _, result := range allResults { - for _, file := range result.files { - filename := filepath.Join(result.dir, file) + for _, item := range toDelete { + for _, file := range item.files { + filename := filepath.Join(item.dir, file) if err := os.Remove(filename); err != nil { cclog.Warnf("[METRICSTORE]> could not remove archived checkpoint %s: %v", filename, err) } else { @@ -270,7 +297,7 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err } cclog.Infof("[METRICSTORE]> archived %d rows from %d files for cluster %s to %s", - len(allRows), totalFiles, cluster, parquetFile) + writer.count, totalFiles, cluster, parquetFile) } return totalFiles, nil diff --git a/pkg/metricstore/parquetArchive.go b/pkg/metricstore/parquetArchive.go index 420ee4e5..18ee2c64 100644 --- a/pkg/metricstore/parquetArchive.go +++ b/pkg/metricstore/parquetArchive.go @@ -12,6 +12,7 @@ import ( "fmt" "os" "path/filepath" + "sort" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" pq "github.com/parquet-go/parquet-go" @@ -91,43 +92,81 @@ func parseScopeFromName(name string) (string, string) { return name, "" } -// writeParquetArchive writes rows to a Parquet file with Zstd compression. -func writeParquetArchive(filename string, rows []ParquetMetricRow) error { +// parquetArchiveWriter supports incremental writes to a Parquet file. +// Each call to WriteHostRows writes one row group (typically one host's data), +// avoiding accumulation of all rows in memory. +type parquetArchiveWriter struct { + writer *pq.GenericWriter[ParquetMetricRow] + bw *bufio.Writer + f *os.File + count int +} + +// newParquetArchiveWriter creates a streaming Parquet writer with Zstd compression. +func newParquetArchiveWriter(filename string) (*parquetArchiveWriter, error) { if err := os.MkdirAll(filepath.Dir(filename), CheckpointDirPerms); err != nil { - return fmt.Errorf("creating archive directory: %w", err) + return nil, fmt.Errorf("creating archive directory: %w", err) } f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) if err != nil { - return fmt.Errorf("creating parquet file: %w", err) + return nil, fmt.Errorf("creating parquet file: %w", err) } - defer f.Close() bw := bufio.NewWriterSize(f, 1<<20) // 1MB write buffer writer := pq.NewGenericWriter[ParquetMetricRow](bw, pq.Compression(&pq.Zstd), - pq.SortingWriterConfig(pq.SortingColumns( - pq.Ascending("cluster"), - pq.Ascending("hostname"), - pq.Ascending("metric"), - pq.Ascending("timestamp"), - )), ) - if _, err := writer.Write(rows); err != nil { + return &parquetArchiveWriter{writer: writer, bw: bw, f: f}, nil +} + +// WriteHostRows sorts rows by (metric, timestamp) in-place, writes them, +// and flushes to create a separate row group. +func (w *parquetArchiveWriter) WriteHostRows(rows []ParquetMetricRow) error { + sort.Slice(rows, func(i, j int) bool { + if rows[i].Metric != rows[j].Metric { + return rows[i].Metric < rows[j].Metric + } + return rows[i].Timestamp < rows[j].Timestamp + }) + + if _, err := w.writer.Write(rows); err != nil { return fmt.Errorf("writing parquet rows: %w", err) } - if err := writer.Close(); err != nil { + if err := w.writer.Flush(); err != nil { + return fmt.Errorf("flushing parquet row group: %w", err) + } + + w.count += len(rows) + return nil +} + +// Close finalises the Parquet file (footer, buffered I/O, file handle). +func (w *parquetArchiveWriter) Close() error { + if err := w.writer.Close(); err != nil { + w.f.Close() return fmt.Errorf("closing parquet writer: %w", err) } - if err := bw.Flush(); err != nil { + if err := w.bw.Flush(); err != nil { + w.f.Close() return fmt.Errorf("flushing parquet file: %w", err) } - return nil + return w.f.Close() +} + +// sortParquetRows sorts rows by (metric, timestamp) in-place. +func sortParquetRows(rows []ParquetMetricRow) { + sort.Slice(rows, func(i, j int) bool { + if rows[i].Metric != rows[j].Metric { + return rows[i].Metric < rows[j].Metric + } + return rows[i].Timestamp < rows[j].Timestamp + }) } // loadCheckpointFileFromDisk reads a JSON or binary checkpoint file and returns @@ -179,6 +218,19 @@ func loadCheckpointFileFromDisk(filename string) (*CheckpointFile, error) { } } +// estimateRowCount estimates the number of Parquet rows a CheckpointFile will produce. +// Used for pre-allocating the rows slice to avoid repeated append doubling. +func estimateRowCount(cf *CheckpointFile) int { + n := 0 + for _, cm := range cf.Metrics { + n += len(cm.Data) + } + for _, child := range cf.Children { + n += estimateRowCount(child) + } + return n +} + // archiveCheckpointsToParquet reads checkpoint files for a host directory, // converts them to Parquet rows. Returns the rows and filenames that were processed. func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]ParquetMetricRow, []string, error) { @@ -196,7 +248,13 @@ func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]Parqu return nil, nil, nil } - var rows []ParquetMetricRow + // First pass: load checkpoints and estimate total rows for pre-allocation. + type loaded struct { + cf *CheckpointFile + filename string + } + var checkpoints []loaded + totalEstimate := 0 for _, checkpoint := range files { filename := filepath.Join(dir, checkpoint) @@ -205,9 +263,21 @@ func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]Parqu cclog.Warnf("[METRICSTORE]> skipping unreadable checkpoint %s: %v", filename, err) continue } - - rows = flattenCheckpointFile(cf, cluster, host, "node", "", rows) + totalEstimate += estimateRowCount(cf) + checkpoints = append(checkpoints, loaded{cf: cf, filename: checkpoint}) } - return rows, files, nil + if len(checkpoints) == 0 { + return nil, nil, nil + } + + rows := make([]ParquetMetricRow, 0, totalEstimate) + processedFiles := make([]string, 0, len(checkpoints)) + + for _, cp := range checkpoints { + rows = flattenCheckpointFile(cp.cf, cluster, host, "node", "", rows) + processedFiles = append(processedFiles, cp.filename) + } + + return rows, processedFiles, nil } diff --git a/pkg/metricstore/parquetArchive_test.go b/pkg/metricstore/parquetArchive_test.go index d3d70c02..e10b0d03 100644 --- a/pkg/metricstore/parquetArchive_test.go +++ b/pkg/metricstore/parquetArchive_test.go @@ -162,7 +162,15 @@ func TestParquetArchiveRoundtrip(t *testing.T) { } parquetFile := filepath.Join(archiveDir, "testcluster", "1000.parquet") - if err := writeParquetArchive(parquetFile, rows); err != nil { + writer, err := newParquetArchiveWriter(parquetFile) + if err != nil { + t.Fatal(err) + } + sortParquetRows(rows) + if err := writer.WriteHostRows(rows); err != nil { + t.Fatal(err) + } + if err := writer.Close(); err != nil { t.Fatal(err) }