fix: reduce memory usage in parquet checkpoint archiver

Stream CheckpointFile trees directly to parquet rows instead of
materializing all rows in a giant intermediate slice. This eliminates
~1.9GB per host of redundant allocations (repeated string headers)
and removes the expensive sort on millions of 104-byte structs.

Key changes:
- Replace flattenCheckpointFile + sortParquetRows + WriteHostRows with
  streaming WriteCheckpointFile that walks the tree with sorted keys
- Reduce results channel buffer from len(hostEntries) to 2 for
  back-pressure (at most NumWorkers+2 results in flight)
- Workers send CheckpointFile trees instead of []ParquetMetricRow
- Write rows in small 1024-element batches via reusable buffer

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Entire-Checkpoint: f31dc1847539
This commit is contained in:
2026-03-18 17:32:16 +01:00
parent bb6915771d
commit 09501df3c2
3 changed files with 155 additions and 120 deletions

View File

@@ -168,8 +168,9 @@ func deleteCheckpoints(checkpointsDir string, from int64) (int, error) {
// archiveCheckpoints archives checkpoint files to Parquet format. // archiveCheckpoints archives checkpoint files to Parquet format.
// Produces one Parquet file per cluster: <cleanupDir>/<cluster>/<timestamp>.parquet // Produces one Parquet file per cluster: <cleanupDir>/<cluster>/<timestamp>.parquet
// Each host's rows are written as a separate row group to avoid accumulating // Workers load checkpoint files from disk and send CheckpointFile trees on a
// all data in memory at once. // back-pressured channel. The main thread streams each tree directly to Parquet
// rows without materializing all rows in memory.
func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, error) { func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, error) {
cclog.Info("[METRICSTORE]> start archiving checkpoints to parquet") cclog.Info("[METRICSTORE]> start archiving checkpoints to parquet")
startTime := time.Now() startTime := time.Now()
@@ -192,14 +193,16 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
return totalFiles, err return totalFiles, err
} }
// Stream per-host rows to parquet writer via worker pool // Workers load checkpoint files from disk; main thread writes to parquet.
type hostResult struct { type hostResult struct {
rows []ParquetMetricRow checkpoints []*CheckpointFile
hostname string
files []string // checkpoint filenames to delete after successful write files []string // checkpoint filenames to delete after successful write
dir string // checkpoint directory for this host dir string // checkpoint directory for this host
} }
results := make(chan hostResult, len(hostEntries)) // Small buffer provides back-pressure: at most NumWorkers+2 results in flight.
results := make(chan hostResult, 2)
work := make(chan struct { work := make(chan struct {
dir, host string dir, host string
}, Keys.NumWorkers) }, Keys.NumWorkers)
@@ -212,14 +215,19 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
go func() { go func() {
defer wg.Done() defer wg.Done()
for item := range work { for item := range work {
rows, files, err := archiveCheckpointsToParquet(item.dir, cluster, item.host, from) checkpoints, files, err := loadCheckpointFiles(item.dir, from)
if err != nil { if err != nil {
cclog.Errorf("[METRICSTORE]> error reading checkpoints for %s/%s: %s", cluster, item.host, err.Error()) cclog.Errorf("[METRICSTORE]> error reading checkpoints for %s/%s: %s", cluster, item.host, err.Error())
atomic.AddInt32(&errs, 1) atomic.AddInt32(&errs, 1)
continue continue
} }
if len(rows) > 0 { if len(checkpoints) > 0 {
results <- hostResult{rows: rows, files: files, dir: item.dir} results <- hostResult{
checkpoints: checkpoints,
hostname: item.host,
files: files,
dir: item.dir,
}
} }
} }
}() }()
@@ -240,7 +248,7 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
close(results) close(results)
}() }()
// Open streaming writer and write each host's rows as a row group // Open streaming writer and write each host's checkpoint files as a row group
parquetFile := filepath.Join(cleanupDir, cluster, fmt.Sprintf("%d.parquet", from)) parquetFile := filepath.Join(cleanupDir, cluster, fmt.Sprintf("%d.parquet", from))
writer, err := newParquetArchiveWriter(parquetFile) writer, err := newParquetArchiveWriter(parquetFile)
if err != nil { if err != nil {
@@ -259,9 +267,13 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
for r := range results { for r := range results {
if writeErr == nil { if writeErr == nil {
sortParquetRows(r.rows) // Stream each checkpoint file directly to parquet rows.
if err := writer.WriteHostRows(r.rows); err != nil { // Each checkpoint is processed and discarded before the next.
for _, cf := range r.checkpoints {
if err := writer.WriteCheckpointFile(cf, cluster, r.hostname, "node", ""); err != nil {
writeErr = err writeErr = err
break
}
} }
} }
// Always track files for deletion (even if write failed, we still drain) // Always track files for deletion (even if write failed, we still drain)

View File

@@ -14,7 +14,6 @@ import (
"path/filepath" "path/filepath"
"sort" "sort"
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
pq "github.com/parquet-go/parquet-go" pq "github.com/parquet-go/parquet-go"
) )
@@ -32,37 +31,6 @@ type ParquetMetricRow struct {
Value float32 `parquet:"value"` Value float32 `parquet:"value"`
} }
// flattenCheckpointFile recursively converts a CheckpointFile tree into Parquet rows.
// The scope path is built from the hierarchy: host level is "node", then child names
// map to scope/scope_id (e.g., "socket0" → scope="socket", scope_id="0").
func flattenCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string, rows []ParquetMetricRow) []ParquetMetricRow {
for metricName, cm := range cf.Metrics {
ts := cm.Start
for _, v := range cm.Data {
if !v.IsNaN() {
rows = append(rows, ParquetMetricRow{
Cluster: cluster,
Hostname: hostname,
Metric: metricName,
Scope: scope,
ScopeID: scopeID,
Timestamp: ts,
Frequency: cm.Frequency,
Value: float32(v),
})
}
ts += cm.Frequency
}
}
for childName, childCf := range cf.Children {
childScope, childScopeID := parseScopeFromName(childName)
rows = flattenCheckpointFile(childCf, cluster, hostname, childScope, childScopeID, rows)
}
return rows
}
// parseScopeFromName infers scope and scope_id from a child level name. // parseScopeFromName infers scope and scope_id from a child level name.
// Examples: "socket0" → ("socket", "0"), "core12" → ("core", "12"), // Examples: "socket0" → ("socket", "0"), "core12" → ("core", "12"),
// "a0" (accelerator) → ("accelerator", "0"). // "a0" (accelerator) → ("accelerator", "0").
@@ -93,15 +61,17 @@ func parseScopeFromName(name string) (string, string) {
} }
// parquetArchiveWriter supports incremental writes to a Parquet file. // parquetArchiveWriter supports incremental writes to a Parquet file.
// Each call to WriteHostRows writes one row group (typically one host's data), // Uses streaming writes to avoid accumulating all rows in memory.
// avoiding accumulation of all rows in memory.
type parquetArchiveWriter struct { type parquetArchiveWriter struct {
writer *pq.GenericWriter[ParquetMetricRow] writer *pq.GenericWriter[ParquetMetricRow]
bw *bufio.Writer bw *bufio.Writer
f *os.File f *os.File
batch []ParquetMetricRow // reusable batch buffer
count int count int
} }
const parquetBatchSize = 1024
// newParquetArchiveWriter creates a streaming Parquet writer with Zstd compression. // newParquetArchiveWriter creates a streaming Parquet writer with Zstd compression.
func newParquetArchiveWriter(filename string) (*parquetArchiveWriter, error) { func newParquetArchiveWriter(filename string) (*parquetArchiveWriter, error) {
if err := os.MkdirAll(filepath.Dir(filename), CheckpointDirPerms); err != nil { if err := os.MkdirAll(filepath.Dir(filename), CheckpointDirPerms); err != nil {
@@ -119,31 +89,85 @@ func newParquetArchiveWriter(filename string) (*parquetArchiveWriter, error) {
pq.Compression(&pq.Zstd), pq.Compression(&pq.Zstd),
) )
return &parquetArchiveWriter{writer: writer, bw: bw, f: f}, nil return &parquetArchiveWriter{
writer: writer,
bw: bw,
f: f,
batch: make([]ParquetMetricRow, 0, parquetBatchSize),
}, nil
} }
// WriteHostRows sorts rows by (metric, timestamp) in-place, writes them, // WriteCheckpointFile streams a CheckpointFile tree directly to Parquet rows,
// and flushes to create a separate row group. // writing metrics in sorted order without materializing all rows in memory.
func (w *parquetArchiveWriter) WriteHostRows(rows []ParquetMetricRow) error { // Produces one row group per call (typically one host's data).
sort.Slice(rows, func(i, j int) bool { func (w *parquetArchiveWriter) WriteCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string) error {
if rows[i].Metric != rows[j].Metric { w.writeLevel(cf, cluster, hostname, scope, scopeID)
return rows[i].Metric < rows[j].Metric
}
return rows[i].Timestamp < rows[j].Timestamp
})
if _, err := w.writer.Write(rows); err != nil { // Flush remaining batch
if len(w.batch) > 0 {
if _, err := w.writer.Write(w.batch); err != nil {
return fmt.Errorf("writing parquet rows: %w", err) return fmt.Errorf("writing parquet rows: %w", err)
} }
w.count += len(w.batch)
w.batch = w.batch[:0]
}
if err := w.writer.Flush(); err != nil { if err := w.writer.Flush(); err != nil {
return fmt.Errorf("flushing parquet row group: %w", err) return fmt.Errorf("flushing parquet row group: %w", err)
} }
w.count += len(rows)
return nil return nil
} }
// writeLevel recursively writes metrics from a CheckpointFile level.
// Metric names and child names are sorted for deterministic, compression-friendly output.
func (w *parquetArchiveWriter) writeLevel(cf *CheckpointFile, cluster, hostname, scope, scopeID string) {
// Sort metric names for deterministic order
metricNames := make([]string, 0, len(cf.Metrics))
for name := range cf.Metrics {
metricNames = append(metricNames, name)
}
sort.Strings(metricNames)
for _, metricName := range metricNames {
cm := cf.Metrics[metricName]
ts := cm.Start
for _, v := range cm.Data {
if !v.IsNaN() {
w.batch = append(w.batch, ParquetMetricRow{
Cluster: cluster,
Hostname: hostname,
Metric: metricName,
Scope: scope,
ScopeID: scopeID,
Timestamp: ts,
Frequency: cm.Frequency,
Value: float32(v),
})
if len(w.batch) >= parquetBatchSize {
w.writer.Write(w.batch)
w.count += len(w.batch)
w.batch = w.batch[:0]
}
}
ts += cm.Frequency
}
}
// Sort child names for deterministic order
childNames := make([]string, 0, len(cf.Children))
for name := range cf.Children {
childNames = append(childNames, name)
}
sort.Strings(childNames)
for _, childName := range childNames {
childScope, childScopeID := parseScopeFromName(childName)
w.writeLevel(cf.Children[childName], cluster, hostname, childScope, childScopeID)
}
}
// Close finalises the Parquet file (footer, buffered I/O, file handle). // Close finalises the Parquet file (footer, buffered I/O, file handle).
func (w *parquetArchiveWriter) Close() error { func (w *parquetArchiveWriter) Close() error {
if err := w.writer.Close(); err != nil { if err := w.writer.Close(); err != nil {
@@ -159,16 +183,6 @@ func (w *parquetArchiveWriter) Close() error {
return w.f.Close() return w.f.Close()
} }
// sortParquetRows sorts rows by (metric, timestamp) in-place.
func sortParquetRows(rows []ParquetMetricRow) {
sort.Slice(rows, func(i, j int) bool {
if rows[i].Metric != rows[j].Metric {
return rows[i].Metric < rows[j].Metric
}
return rows[i].Timestamp < rows[j].Timestamp
})
}
// loadCheckpointFileFromDisk reads a JSON or binary checkpoint file and returns // loadCheckpointFileFromDisk reads a JSON or binary checkpoint file and returns
// a CheckpointFile. Used by the Parquet archiver to read checkpoint data // a CheckpointFile. Used by the Parquet archiver to read checkpoint data
// before converting it to Parquet format. // before converting it to Parquet format.
@@ -218,22 +232,10 @@ func loadCheckpointFileFromDisk(filename string) (*CheckpointFile, error) {
} }
} }
// estimateRowCount estimates the number of Parquet rows a CheckpointFile will produce. // loadCheckpointFiles reads checkpoint files for a host directory and returns
// Used for pre-allocating the rows slice to avoid repeated append doubling. // the loaded CheckpointFiles and their filenames. Processes one file at a time
func estimateRowCount(cf *CheckpointFile) int { // to avoid holding all checkpoint data in memory simultaneously.
n := 0 func loadCheckpointFiles(dir string, from int64) ([]*CheckpointFile, []string, error) {
for _, cm := range cf.Metrics {
n += len(cm.Data)
}
for _, child := range cf.Children {
n += estimateRowCount(child)
}
return n
}
// archiveCheckpointsToParquet reads checkpoint files for a host directory,
// converts them to Parquet rows. Returns the rows and filenames that were processed.
func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]ParquetMetricRow, []string, error) {
entries, err := os.ReadDir(dir) entries, err := os.ReadDir(dir)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@@ -248,36 +250,18 @@ func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]Parqu
return nil, nil, nil return nil, nil, nil
} }
// First pass: load checkpoints and estimate total rows for pre-allocation. var checkpoints []*CheckpointFile
type loaded struct { var processedFiles []string
cf *CheckpointFile
filename string
}
var checkpoints []loaded
totalEstimate := 0
for _, checkpoint := range files { for _, checkpoint := range files {
filename := filepath.Join(dir, checkpoint) filename := filepath.Join(dir, checkpoint)
cf, err := loadCheckpointFileFromDisk(filename) cf, err := loadCheckpointFileFromDisk(filename)
if err != nil { if err != nil {
cclog.Warnf("[METRICSTORE]> skipping unreadable checkpoint %s: %v", filename, err)
continue continue
} }
totalEstimate += estimateRowCount(cf) checkpoints = append(checkpoints, cf)
checkpoints = append(checkpoints, loaded{cf: cf, filename: checkpoint}) processedFiles = append(processedFiles, checkpoint)
} }
if len(checkpoints) == 0 { return checkpoints, processedFiles, nil
return nil, nil, nil
}
rows := make([]ParquetMetricRow, 0, totalEstimate)
processedFiles := make([]string, 0, len(checkpoints))
for _, cp := range checkpoints {
rows = flattenCheckpointFile(cp.cf, cluster, host, "node", "", rows)
processedFiles = append(processedFiles, cp.filename)
}
return rows, processedFiles, nil
} }

View File

@@ -44,7 +44,7 @@ func TestParseScopeFromName(t *testing.T) {
} }
} }
func TestFlattenCheckpointFile(t *testing.T) { func TestWriteCheckpointFile(t *testing.T) {
cf := &CheckpointFile{ cf := &CheckpointFile{
From: 1000, From: 1000,
To: 1060, To: 1060,
@@ -69,17 +69,55 @@ func TestFlattenCheckpointFile(t *testing.T) {
}, },
} }
rows := flattenCheckpointFile(cf, "fritz", "node001", "node", "", nil) tmpDir := t.TempDir()
parquetFile := filepath.Join(tmpDir, "test.parquet")
writer, err := newParquetArchiveWriter(parquetFile)
if err != nil {
t.Fatal(err)
}
if err := writer.WriteCheckpointFile(cf, "fritz", "node001", "node", ""); err != nil {
t.Fatal(err)
}
if err := writer.Close(); err != nil {
t.Fatal(err)
}
// cpu_load: 2 non-NaN values at node scope // cpu_load: 2 non-NaN values at node scope
// mem_bw: 2 non-NaN values at socket0 scope // mem_bw: 2 non-NaN values at socket0 scope
if len(rows) != 4 { if writer.count != 4 {
t.Fatalf("expected 4 rows, got %d", len(rows)) t.Fatalf("expected 4 rows written, got %d", writer.count)
}
// Read back and verify
f, err := os.Open(parquetFile)
if err != nil {
t.Fatal(err)
}
defer f.Close()
stat, _ := f.Stat()
pf, err := pq.OpenFile(f, stat.Size())
if err != nil {
t.Fatal(err)
}
reader := pq.NewGenericReader[ParquetMetricRow](pf)
readRows := make([]ParquetMetricRow, 100)
n, err := reader.Read(readRows)
if err != nil && n == 0 {
t.Fatal(err)
}
readRows = readRows[:n]
reader.Close()
if n != 4 {
t.Fatalf("expected 4 rows, got %d", n)
} }
// Verify a node-scope row // Verify a node-scope row
found := false found := false
for _, r := range rows { for _, r := range readRows {
if r.Metric == "cpu_load" && r.Timestamp == 1000 { if r.Metric == "cpu_load" && r.Timestamp == 1000 {
found = true found = true
if r.Cluster != "fritz" || r.Hostname != "node001" || r.Scope != "node" || r.Value != 0.5 { if r.Cluster != "fritz" || r.Hostname != "node001" || r.Scope != "node" || r.Value != 0.5 {
@@ -93,7 +131,7 @@ func TestFlattenCheckpointFile(t *testing.T) {
// Verify a socket-scope row // Verify a socket-scope row
found = false found = false
for _, r := range rows { for _, r := range readRows {
if r.Metric == "mem_bw" && r.Scope == "socket" && r.ScopeID == "0" { if r.Metric == "mem_bw" && r.Scope == "socket" && r.ScopeID == "0" {
found = true found = true
} }
@@ -153,7 +191,7 @@ func TestParquetArchiveRoundtrip(t *testing.T) {
// Archive to Parquet // Archive to Parquet
archiveDir := filepath.Join(tmpDir, "archive") archiveDir := filepath.Join(tmpDir, "archive")
rows, files, err := archiveCheckpointsToParquet(cpDir, "testcluster", "node001", 2000) checkpoints, files, err := loadCheckpointFiles(cpDir, 2000)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -166,10 +204,11 @@ func TestParquetArchiveRoundtrip(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
sortParquetRows(rows) for _, cp := range checkpoints {
if err := writer.WriteHostRows(rows); err != nil { if err := writer.WriteCheckpointFile(cp, "testcluster", "node001", "node", ""); err != nil {
t.Fatal(err) t.Fatal(err)
} }
}
if err := writer.Close(); err != nil { if err := writer.Close(); err != nil {
t.Fatal(err) t.Fatal(err)
} }