fix: Prevent memory spikes in parquet writer for metricstore move policy

Entire-Checkpoint: 4a675b8352a2
This commit is contained in:
2026-03-18 05:08:37 +01:00
parent 3314b8e284
commit 02f82c2c0b
3 changed files with 140 additions and 35 deletions

View File

@@ -12,6 +12,7 @@ import (
"fmt"
"os"
"path/filepath"
"sort"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
pq "github.com/parquet-go/parquet-go"
@@ -91,43 +92,81 @@ func parseScopeFromName(name string) (string, string) {
return name, ""
}
// writeParquetArchive writes rows to a Parquet file with Zstd compression.
func writeParquetArchive(filename string, rows []ParquetMetricRow) error {
// parquetArchiveWriter supports incremental writes to a Parquet file.
// Each call to WriteHostRows writes one row group (typically one host's data),
// avoiding accumulation of all rows in memory.
type parquetArchiveWriter struct {
writer *pq.GenericWriter[ParquetMetricRow]
bw *bufio.Writer
f *os.File
count int
}
// newParquetArchiveWriter creates a streaming Parquet writer with Zstd compression.
func newParquetArchiveWriter(filename string) (*parquetArchiveWriter, error) {
if err := os.MkdirAll(filepath.Dir(filename), CheckpointDirPerms); err != nil {
return fmt.Errorf("creating archive directory: %w", err)
return nil, fmt.Errorf("creating archive directory: %w", err)
}
f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
if err != nil {
return fmt.Errorf("creating parquet file: %w", err)
return nil, fmt.Errorf("creating parquet file: %w", err)
}
defer f.Close()
bw := bufio.NewWriterSize(f, 1<<20) // 1MB write buffer
writer := pq.NewGenericWriter[ParquetMetricRow](bw,
pq.Compression(&pq.Zstd),
pq.SortingWriterConfig(pq.SortingColumns(
pq.Ascending("cluster"),
pq.Ascending("hostname"),
pq.Ascending("metric"),
pq.Ascending("timestamp"),
)),
)
if _, err := writer.Write(rows); err != nil {
return &parquetArchiveWriter{writer: writer, bw: bw, f: f}, nil
}
// WriteHostRows sorts rows by (metric, timestamp) in-place, writes them,
// and flushes to create a separate row group.
func (w *parquetArchiveWriter) WriteHostRows(rows []ParquetMetricRow) error {
sort.Slice(rows, func(i, j int) bool {
if rows[i].Metric != rows[j].Metric {
return rows[i].Metric < rows[j].Metric
}
return rows[i].Timestamp < rows[j].Timestamp
})
if _, err := w.writer.Write(rows); err != nil {
return fmt.Errorf("writing parquet rows: %w", err)
}
if err := writer.Close(); err != nil {
if err := w.writer.Flush(); err != nil {
return fmt.Errorf("flushing parquet row group: %w", err)
}
w.count += len(rows)
return nil
}
// Close finalises the Parquet file (footer, buffered I/O, file handle).
func (w *parquetArchiveWriter) Close() error {
if err := w.writer.Close(); err != nil {
w.f.Close()
return fmt.Errorf("closing parquet writer: %w", err)
}
if err := bw.Flush(); err != nil {
if err := w.bw.Flush(); err != nil {
w.f.Close()
return fmt.Errorf("flushing parquet file: %w", err)
}
return nil
return w.f.Close()
}
// sortParquetRows sorts rows by (metric, timestamp) in-place.
func sortParquetRows(rows []ParquetMetricRow) {
sort.Slice(rows, func(i, j int) bool {
if rows[i].Metric != rows[j].Metric {
return rows[i].Metric < rows[j].Metric
}
return rows[i].Timestamp < rows[j].Timestamp
})
}
// loadCheckpointFileFromDisk reads a JSON or binary checkpoint file and returns
@@ -179,6 +218,19 @@ func loadCheckpointFileFromDisk(filename string) (*CheckpointFile, error) {
}
}
// estimateRowCount estimates the number of Parquet rows a CheckpointFile will produce.
// Used for pre-allocating the rows slice to avoid repeated append doubling.
func estimateRowCount(cf *CheckpointFile) int {
n := 0
for _, cm := range cf.Metrics {
n += len(cm.Data)
}
for _, child := range cf.Children {
n += estimateRowCount(child)
}
return n
}
// archiveCheckpointsToParquet reads checkpoint files for a host directory,
// converts them to Parquet rows. Returns the rows and filenames that were processed.
func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]ParquetMetricRow, []string, error) {
@@ -196,7 +248,13 @@ func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]Parqu
return nil, nil, nil
}
var rows []ParquetMetricRow
// First pass: load checkpoints and estimate total rows for pre-allocation.
type loaded struct {
cf *CheckpointFile
filename string
}
var checkpoints []loaded
totalEstimate := 0
for _, checkpoint := range files {
filename := filepath.Join(dir, checkpoint)
@@ -205,9 +263,21 @@ func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]Parqu
cclog.Warnf("[METRICSTORE]> skipping unreadable checkpoint %s: %v", filename, err)
continue
}
rows = flattenCheckpointFile(cf, cluster, host, "node", "", rows)
totalEstimate += estimateRowCount(cf)
checkpoints = append(checkpoints, loaded{cf: cf, filename: checkpoint})
}
return rows, files, nil
if len(checkpoints) == 0 {
return nil, nil, nil
}
rows := make([]ParquetMetricRow, 0, totalEstimate)
processedFiles := make([]string, 0, len(checkpoints))
for _, cp := range checkpoints {
rows = flattenCheckpointFile(cp.cf, cluster, host, "node", "", rows)
processedFiles = append(processedFiles, cp.filename)
}
return rows, processedFiles, nil
}