Patching parquet archive writer for high memory usage

This commit is contained in:
Aditya Ujeniya
2026-03-17 18:41:47 +01:00
parent 032d1e0692
commit 0a4c4d8e57
3 changed files with 112 additions and 52 deletions

View File

@@ -43,7 +43,6 @@ func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
// cleanUpWorker takes simple values to configure what it does // cleanUpWorker takes simple values to configure what it does
func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) { func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) {
wg.Go(func() { wg.Go(func() {
d, err := time.ParseDuration(interval) d, err := time.ParseDuration(interval)
if err != nil { if err != nil {
cclog.Fatalf("[METRICSTORE]> error parsing %s interval duration: %v\n", mode, err) cclog.Fatalf("[METRICSTORE]> error parsing %s interval duration: %v\n", mode, err)
@@ -99,8 +98,8 @@ func deleteCheckpoints(checkpointsDir string, from int64) (int, error) {
} }
type workItem struct { type workItem struct {
dir string dir string
cluster, host string cluster, host string
} }
var wg sync.WaitGroup var wg sync.WaitGroup
@@ -187,9 +186,8 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
return totalFiles, err return totalFiles, err
} }
// Collect rows from all hosts in this cluster using worker pool // Collect files to delete from all hosts in this cluster using worker pool
type hostResult struct { type hostResult struct {
rows []ParquetMetricRow
files []string // checkpoint filenames to delete after successful write files []string // checkpoint filenames to delete after successful write
dir string // checkpoint directory for this host dir string // checkpoint directory for this host
} }
@@ -199,6 +197,8 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
dir, host string dir, host string
}, Keys.NumWorkers) }, Keys.NumWorkers)
rowChan := make(chan *ParquetMetricRow, 10000)
var wg sync.WaitGroup var wg sync.WaitGroup
errs := int32(0) errs := int32(0)
@@ -207,19 +207,20 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
go func() { go func() {
defer wg.Done() defer wg.Done()
for item := range work { for item := range work {
rows, files, err := archiveCheckpointsToParquet(item.dir, cluster, item.host, from) files, err := archiveCheckpointsToParquet(item.dir, cluster, item.host, from, rowChan)
if err != nil { if err != nil {
cclog.Errorf("[METRICSTORE]> error reading checkpoints for %s/%s: %s", cluster, item.host, err.Error()) cclog.Errorf("[METRICSTORE]> error reading checkpoints for %s/%s: %s", cluster, item.host, err.Error())
atomic.AddInt32(&errs, 1) atomic.AddInt32(&errs, 1)
continue continue
} }
if len(rows) > 0 { if len(files) > 0 {
results <- hostResult{rows: rows, files: files, dir: item.dir} results <- hostResult{files: files, dir: item.dir}
} }
} }
}() }()
} }
// Produce work items
go func() { go func() {
for _, hostEntry := range hostEntries { for _, hostEntry := range hostEntries {
if !hostEntry.IsDir() { if !hostEntry.IsDir() {
@@ -231,15 +232,22 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
}{dir: dir, host: hostEntry.Name()} }{dir: dir, host: hostEntry.Name()}
} }
close(work) close(work)
}()
// Wait for all workers and close rowChan and results
go func() {
wg.Wait() wg.Wait()
close(rowChan)
close(results) close(results)
}() }()
// Collect all rows and file info // Concurrently write from rowChan to Parquet
var allRows []ParquetMetricRow parquetFile := filepath.Join(cleanupDir, cluster, fmt.Sprintf("%d.parquet", from))
rowCount, writerErr := writeParquetArchiveStream(parquetFile, rowChan)
// Collect all file info
var allResults []hostResult var allResults []hostResult
for r := range results { for r := range results {
allRows = append(allRows, r.rows...)
allResults = append(allResults, r) allResults = append(allResults, r)
} }
@@ -247,17 +255,18 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
return totalFiles, fmt.Errorf("%d errors reading checkpoints for cluster %s", errs, cluster) return totalFiles, fmt.Errorf("%d errors reading checkpoints for cluster %s", errs, cluster)
} }
if len(allRows) == 0 { if writerErr != nil {
return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, writerErr)
}
if rowCount == 0 {
// Cleanup empty parquet file if created
os.Remove(parquetFile)
continue continue
} }
// Write one Parquet file per cluster
parquetFile := filepath.Join(cleanupDir, cluster, fmt.Sprintf("%d.parquet", from))
if err := writeParquetArchive(parquetFile, allRows); err != nil {
return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, err)
}
// Delete archived checkpoint files // Delete archived checkpoint files
totalFilesCluster := 0
for _, result := range allResults { for _, result := range allResults {
for _, file := range result.files { for _, file := range result.files {
filename := filepath.Join(result.dir, file) filename := filepath.Join(result.dir, file)
@@ -265,12 +274,13 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
cclog.Warnf("[METRICSTORE]> could not remove archived checkpoint %s: %v", filename, err) cclog.Warnf("[METRICSTORE]> could not remove archived checkpoint %s: %v", filename, err)
} else { } else {
totalFiles++ totalFiles++
totalFilesCluster++
} }
} }
} }
cclog.Infof("[METRICSTORE]> archived %d rows from %d files for cluster %s to %s", cclog.Infof("[METRICSTORE]> archived %d rows from %d files for cluster %s to %s",
len(allRows), totalFiles, cluster, parquetFile) rowCount, totalFilesCluster, cluster, parquetFile)
} }
return totalFiles, nil return totalFiles, nil

View File

@@ -31,15 +31,15 @@ type ParquetMetricRow struct {
Value float32 `parquet:"value"` Value float32 `parquet:"value"`
} }
// flattenCheckpointFile recursively converts a CheckpointFile tree into Parquet rows. // flattenCheckpointFile recursively converts a CheckpointFile tree into Parquet rows via a channel.
// The scope path is built from the hierarchy: host level is "node", then child names // The scope path is built from the hierarchy: host level is "node", then child names
// map to scope/scope_id (e.g., "socket0" → scope="socket", scope_id="0"). // map to scope/scope_id (e.g., "socket0" → scope="socket", scope_id="0").
func flattenCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string, rows []ParquetMetricRow) []ParquetMetricRow { func flattenCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string, rowChan chan<- *ParquetMetricRow) {
for metricName, cm := range cf.Metrics { for metricName, cm := range cf.Metrics {
ts := cm.Start ts := cm.Start
for _, v := range cm.Data { for _, v := range cm.Data {
if !v.IsNaN() { if !v.IsNaN() {
rows = append(rows, ParquetMetricRow{ rowChan <- &ParquetMetricRow{
Cluster: cluster, Cluster: cluster,
Hostname: hostname, Hostname: hostname,
Metric: metricName, Metric: metricName,
@@ -48,7 +48,7 @@ func flattenCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID
Timestamp: ts, Timestamp: ts,
Frequency: cm.Frequency, Frequency: cm.Frequency,
Value: float32(v), Value: float32(v),
}) }
} }
ts += cm.Frequency ts += cm.Frequency
} }
@@ -56,10 +56,8 @@ func flattenCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID
for childName, childCf := range cf.Children { for childName, childCf := range cf.Children {
childScope, childScopeID := parseScopeFromName(childName) childScope, childScopeID := parseScopeFromName(childName)
rows = flattenCheckpointFile(childCf, cluster, hostname, childScope, childScopeID, rows) flattenCheckpointFile(childCf, cluster, hostname, childScope, childScopeID, rowChan)
} }
return rows
} }
// parseScopeFromName infers scope and scope_id from a child level name. // parseScopeFromName infers scope and scope_id from a child level name.
@@ -91,15 +89,23 @@ func parseScopeFromName(name string) (string, string) {
return name, "" return name, ""
} }
// writeParquetArchive writes rows to a Parquet file with Zstd compression. // writeParquetArchiveStream writes rows from a channel to a Parquet file with Zstd compression in batches.
func writeParquetArchive(filename string, rows []ParquetMetricRow) error { func writeParquetArchiveStream(filename string, rowChan <-chan *ParquetMetricRow) (int, error) {
if err := os.MkdirAll(filepath.Dir(filename), CheckpointDirPerms); err != nil { if err := os.MkdirAll(filepath.Dir(filename), CheckpointDirPerms); err != nil {
return fmt.Errorf("creating archive directory: %w", err) go func() {
for range rowChan {
}
}()
return 0, fmt.Errorf("creating archive directory: %w", err)
} }
f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms)
if err != nil { if err != nil {
return fmt.Errorf("creating parquet file: %w", err) go func() {
for range rowChan {
}
}()
return 0, fmt.Errorf("creating parquet file: %w", err)
} }
defer f.Close() defer f.Close()
@@ -115,19 +121,45 @@ func writeParquetArchive(filename string, rows []ParquetMetricRow) error {
)), )),
) )
if _, err := writer.Write(rows); err != nil { batchSize := 4096
return fmt.Errorf("writing parquet rows: %w", err) batch := make([]ParquetMetricRow, 0, batchSize)
rowCount := 0
var writeErr error
for rowPtr := range rowChan {
if writeErr != nil {
continue // Drain the channel to prevent worker deadlock
}
batch = append(batch, *rowPtr)
if len(batch) >= batchSize {
if _, err := writer.Write(batch); err != nil {
writeErr = fmt.Errorf("writing parquet batch: %w", err)
}
rowCount += len(batch)
batch = batch[:0]
}
}
if writeErr != nil {
return rowCount, writeErr
}
if len(batch) > 0 {
if _, err := writer.Write(batch); err != nil {
return rowCount, fmt.Errorf("writing remaining parquet batch: %w", err)
}
rowCount += len(batch)
} }
if err := writer.Close(); err != nil { if err := writer.Close(); err != nil {
return fmt.Errorf("closing parquet writer: %w", err) return rowCount, fmt.Errorf("closing parquet writer: %w", err)
} }
if err := bw.Flush(); err != nil { if err := bw.Flush(); err != nil {
return fmt.Errorf("flushing parquet file: %w", err) return rowCount, fmt.Errorf("flushing parquet file: %w", err)
} }
return nil return rowCount, nil
} }
// loadCheckpointFileFromDisk reads a JSON or binary checkpoint file and returns // loadCheckpointFileFromDisk reads a JSON or binary checkpoint file and returns
@@ -180,24 +212,22 @@ func loadCheckpointFileFromDisk(filename string) (*CheckpointFile, error) {
} }
// archiveCheckpointsToParquet reads checkpoint files for a host directory, // archiveCheckpointsToParquet reads checkpoint files for a host directory,
// converts them to Parquet rows. Returns the rows and filenames that were processed. // converts them to Parquet rows. Returns the filenames that were processed.
func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]ParquetMetricRow, []string, error) { func archiveCheckpointsToParquet(dir, cluster, host string, from int64, rowChan chan<- *ParquetMetricRow) ([]string, error) {
entries, err := os.ReadDir(dir) entries, err := os.ReadDir(dir)
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
files, err := findFiles(entries, from, false) files, err := findFiles(entries, from, false)
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
if len(files) == 0 { if len(files) == 0 {
return nil, nil, nil return nil, nil
} }
var rows []ParquetMetricRow
for _, checkpoint := range files { for _, checkpoint := range files {
filename := filepath.Join(dir, checkpoint) filename := filepath.Join(dir, checkpoint)
cf, err := loadCheckpointFileFromDisk(filename) cf, err := loadCheckpointFileFromDisk(filename)
@@ -206,8 +236,8 @@ func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]Parqu
continue continue
} }
rows = flattenCheckpointFile(cf, cluster, host, "node", "", rows) flattenCheckpointFile(cf, cluster, host, "node", "", rowChan)
} }
return rows, files, nil return files, nil
} }

View File

@@ -69,7 +69,16 @@ func TestFlattenCheckpointFile(t *testing.T) {
}, },
} }
rows := flattenCheckpointFile(cf, "fritz", "node001", "node", "", nil) rowChan := make(chan *ParquetMetricRow, 100)
go func() {
flattenCheckpointFile(cf, "fritz", "node001", "node", "", rowChan)
close(rowChan)
}()
var rows []ParquetMetricRow
for r := range rowChan {
rows = append(rows, *r)
}
// cpu_load: 2 non-NaN values at node scope // cpu_load: 2 non-NaN values at node scope
// mem_bw: 2 non-NaN values at socket0 scope // mem_bw: 2 non-NaN values at socket0 scope
@@ -153,17 +162,28 @@ func TestParquetArchiveRoundtrip(t *testing.T) {
// Archive to Parquet // Archive to Parquet
archiveDir := filepath.Join(tmpDir, "archive") archiveDir := filepath.Join(tmpDir, "archive")
rows, files, err := archiveCheckpointsToParquet(cpDir, "testcluster", "node001", 2000) rowChan := make(chan *ParquetMetricRow, 100)
var files []string
var archiveErr error
done := make(chan struct{})
go func() {
files, archiveErr = archiveCheckpointsToParquet(cpDir, "testcluster", "node001", 2000, rowChan)
close(rowChan)
close(done)
}()
parquetFile := filepath.Join(archiveDir, "testcluster", "1000.parquet")
_, err = writeParquetArchiveStream(parquetFile, rowChan)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if len(files) != 1 || files[0] != "1000.json" { <-done
t.Fatalf("expected 1 file, got %v", files) if archiveErr != nil {
t.Fatal(archiveErr)
} }
if len(files) != 1 || len(files) > 0 && files[0] != "1000.json" {
parquetFile := filepath.Join(archiveDir, "testcluster", "1000.parquet") t.Fatalf("expected 1 file (1000.json), got %v", files)
if err := writeParquetArchive(parquetFile, rows); err != nil {
t.Fatal(err)
} }
// Read back and verify // Read back and verify