diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index 916736d0..3fc4f060 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -43,7 +43,6 @@ func CleanUp(wg *sync.WaitGroup, ctx context.Context) { // cleanUpWorker takes simple values to configure what it does func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) { wg.Go(func() { - d, err := time.ParseDuration(interval) if err != nil { cclog.Fatalf("[METRICSTORE]> error parsing %s interval duration: %v\n", mode, err) @@ -99,8 +98,8 @@ func deleteCheckpoints(checkpointsDir string, from int64) (int, error) { } type workItem struct { - dir string - cluster, host string + dir string + cluster, host string } var wg sync.WaitGroup @@ -187,9 +186,8 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err return totalFiles, err } - // Collect rows from all hosts in this cluster using worker pool + // Collect files to delete from all hosts in this cluster using worker pool type hostResult struct { - rows []ParquetMetricRow files []string // checkpoint filenames to delete after successful write dir string // checkpoint directory for this host } @@ -199,6 +197,8 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err dir, host string }, Keys.NumWorkers) + rowChan := make(chan *ParquetMetricRow, 10000) + var wg sync.WaitGroup errs := int32(0) @@ -207,19 +207,20 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err go func() { defer wg.Done() for item := range work { - rows, files, err := archiveCheckpointsToParquet(item.dir, cluster, item.host, from) + files, err := archiveCheckpointsToParquet(item.dir, cluster, item.host, from, rowChan) if err != nil { cclog.Errorf("[METRICSTORE]> error reading checkpoints for %s/%s: %s", cluster, item.host, err.Error()) atomic.AddInt32(&errs, 1) continue } - if len(rows) > 0 { - results <- hostResult{rows: rows, files: files, dir: item.dir} + if len(files) > 0 { + results <- hostResult{files: files, dir: item.dir} } } }() } + // Produce work items go func() { for _, hostEntry := range hostEntries { if !hostEntry.IsDir() { @@ -231,15 +232,22 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err }{dir: dir, host: hostEntry.Name()} } close(work) + }() + + // Wait for all workers and close rowChan and results + go func() { wg.Wait() + close(rowChan) close(results) }() - // Collect all rows and file info - var allRows []ParquetMetricRow + // Concurrently write from rowChan to Parquet + parquetFile := filepath.Join(cleanupDir, cluster, fmt.Sprintf("%d.parquet", from)) + rowCount, writerErr := writeParquetArchiveStream(parquetFile, rowChan) + + // Collect all file info var allResults []hostResult for r := range results { - allRows = append(allRows, r.rows...) allResults = append(allResults, r) } @@ -247,17 +255,18 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err return totalFiles, fmt.Errorf("%d errors reading checkpoints for cluster %s", errs, cluster) } - if len(allRows) == 0 { + if writerErr != nil { + return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, writerErr) + } + + if rowCount == 0 { + // Cleanup empty parquet file if created + os.Remove(parquetFile) continue } - // Write one Parquet file per cluster - parquetFile := filepath.Join(cleanupDir, cluster, fmt.Sprintf("%d.parquet", from)) - if err := writeParquetArchive(parquetFile, allRows); err != nil { - return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, err) - } - // Delete archived checkpoint files + totalFilesCluster := 0 for _, result := range allResults { for _, file := range result.files { filename := filepath.Join(result.dir, file) @@ -265,12 +274,13 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err cclog.Warnf("[METRICSTORE]> could not remove archived checkpoint %s: %v", filename, err) } else { totalFiles++ + totalFilesCluster++ } } } cclog.Infof("[METRICSTORE]> archived %d rows from %d files for cluster %s to %s", - len(allRows), totalFiles, cluster, parquetFile) + rowCount, totalFilesCluster, cluster, parquetFile) } return totalFiles, nil diff --git a/pkg/metricstore/parquetArchive.go b/pkg/metricstore/parquetArchive.go index 420ee4e5..4bd13300 100644 --- a/pkg/metricstore/parquetArchive.go +++ b/pkg/metricstore/parquetArchive.go @@ -31,15 +31,15 @@ type ParquetMetricRow struct { Value float32 `parquet:"value"` } -// flattenCheckpointFile recursively converts a CheckpointFile tree into Parquet rows. +// flattenCheckpointFile recursively converts a CheckpointFile tree into Parquet rows via a channel. // The scope path is built from the hierarchy: host level is "node", then child names // map to scope/scope_id (e.g., "socket0" → scope="socket", scope_id="0"). -func flattenCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string, rows []ParquetMetricRow) []ParquetMetricRow { +func flattenCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string, rowChan chan<- *ParquetMetricRow) { for metricName, cm := range cf.Metrics { ts := cm.Start for _, v := range cm.Data { if !v.IsNaN() { - rows = append(rows, ParquetMetricRow{ + rowChan <- &ParquetMetricRow{ Cluster: cluster, Hostname: hostname, Metric: metricName, @@ -48,7 +48,7 @@ func flattenCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID Timestamp: ts, Frequency: cm.Frequency, Value: float32(v), - }) + } } ts += cm.Frequency } @@ -56,10 +56,8 @@ func flattenCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID for childName, childCf := range cf.Children { childScope, childScopeID := parseScopeFromName(childName) - rows = flattenCheckpointFile(childCf, cluster, hostname, childScope, childScopeID, rows) + flattenCheckpointFile(childCf, cluster, hostname, childScope, childScopeID, rowChan) } - - return rows } // parseScopeFromName infers scope and scope_id from a child level name. @@ -91,15 +89,23 @@ func parseScopeFromName(name string) (string, string) { return name, "" } -// writeParquetArchive writes rows to a Parquet file with Zstd compression. -func writeParquetArchive(filename string, rows []ParquetMetricRow) error { +// writeParquetArchiveStream writes rows from a channel to a Parquet file with Zstd compression in batches. +func writeParquetArchiveStream(filename string, rowChan <-chan *ParquetMetricRow) (int, error) { if err := os.MkdirAll(filepath.Dir(filename), CheckpointDirPerms); err != nil { - return fmt.Errorf("creating archive directory: %w", err) + go func() { + for range rowChan { + } + }() + return 0, fmt.Errorf("creating archive directory: %w", err) } f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, CheckpointFilePerms) if err != nil { - return fmt.Errorf("creating parquet file: %w", err) + go func() { + for range rowChan { + } + }() + return 0, fmt.Errorf("creating parquet file: %w", err) } defer f.Close() @@ -115,19 +121,45 @@ func writeParquetArchive(filename string, rows []ParquetMetricRow) error { )), ) - if _, err := writer.Write(rows); err != nil { - return fmt.Errorf("writing parquet rows: %w", err) + batchSize := 4096 + batch := make([]ParquetMetricRow, 0, batchSize) + rowCount := 0 + var writeErr error + + for rowPtr := range rowChan { + if writeErr != nil { + continue // Drain the channel to prevent worker deadlock + } + batch = append(batch, *rowPtr) + if len(batch) >= batchSize { + if _, err := writer.Write(batch); err != nil { + writeErr = fmt.Errorf("writing parquet batch: %w", err) + } + rowCount += len(batch) + batch = batch[:0] + } + } + + if writeErr != nil { + return rowCount, writeErr + } + + if len(batch) > 0 { + if _, err := writer.Write(batch); err != nil { + return rowCount, fmt.Errorf("writing remaining parquet batch: %w", err) + } + rowCount += len(batch) } if err := writer.Close(); err != nil { - return fmt.Errorf("closing parquet writer: %w", err) + return rowCount, fmt.Errorf("closing parquet writer: %w", err) } if err := bw.Flush(); err != nil { - return fmt.Errorf("flushing parquet file: %w", err) + return rowCount, fmt.Errorf("flushing parquet file: %w", err) } - return nil + return rowCount, nil } // loadCheckpointFileFromDisk reads a JSON or binary checkpoint file and returns @@ -180,24 +212,22 @@ func loadCheckpointFileFromDisk(filename string) (*CheckpointFile, error) { } // archiveCheckpointsToParquet reads checkpoint files for a host directory, -// converts them to Parquet rows. Returns the rows and filenames that were processed. -func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]ParquetMetricRow, []string, error) { +// converts them to Parquet rows. Returns the filenames that were processed. +func archiveCheckpointsToParquet(dir, cluster, host string, from int64, rowChan chan<- *ParquetMetricRow) ([]string, error) { entries, err := os.ReadDir(dir) if err != nil { - return nil, nil, err + return nil, err } files, err := findFiles(entries, from, false) if err != nil { - return nil, nil, err + return nil, err } if len(files) == 0 { - return nil, nil, nil + return nil, nil } - var rows []ParquetMetricRow - for _, checkpoint := range files { filename := filepath.Join(dir, checkpoint) cf, err := loadCheckpointFileFromDisk(filename) @@ -206,8 +236,8 @@ func archiveCheckpointsToParquet(dir, cluster, host string, from int64) ([]Parqu continue } - rows = flattenCheckpointFile(cf, cluster, host, "node", "", rows) + flattenCheckpointFile(cf, cluster, host, "node", "", rowChan) } - return rows, files, nil + return files, nil } diff --git a/pkg/metricstore/parquetArchive_test.go b/pkg/metricstore/parquetArchive_test.go index d3d70c02..326f4c70 100644 --- a/pkg/metricstore/parquetArchive_test.go +++ b/pkg/metricstore/parquetArchive_test.go @@ -69,7 +69,16 @@ func TestFlattenCheckpointFile(t *testing.T) { }, } - rows := flattenCheckpointFile(cf, "fritz", "node001", "node", "", nil) + rowChan := make(chan *ParquetMetricRow, 100) + go func() { + flattenCheckpointFile(cf, "fritz", "node001", "node", "", rowChan) + close(rowChan) + }() + + var rows []ParquetMetricRow + for r := range rowChan { + rows = append(rows, *r) + } // cpu_load: 2 non-NaN values at node scope // mem_bw: 2 non-NaN values at socket0 scope @@ -153,17 +162,28 @@ func TestParquetArchiveRoundtrip(t *testing.T) { // Archive to Parquet archiveDir := filepath.Join(tmpDir, "archive") - rows, files, err := archiveCheckpointsToParquet(cpDir, "testcluster", "node001", 2000) + rowChan := make(chan *ParquetMetricRow, 100) + + var files []string + var archiveErr error + done := make(chan struct{}) + go func() { + files, archiveErr = archiveCheckpointsToParquet(cpDir, "testcluster", "node001", 2000, rowChan) + close(rowChan) + close(done) + }() + + parquetFile := filepath.Join(archiveDir, "testcluster", "1000.parquet") + _, err = writeParquetArchiveStream(parquetFile, rowChan) if err != nil { t.Fatal(err) } - if len(files) != 1 || files[0] != "1000.json" { - t.Fatalf("expected 1 file, got %v", files) + <-done + if archiveErr != nil { + t.Fatal(archiveErr) } - - parquetFile := filepath.Join(archiveDir, "testcluster", "1000.parquet") - if err := writeParquetArchive(parquetFile, rows); err != nil { - t.Fatal(err) + if len(files) != 1 || len(files) > 0 && files[0] != "1000.json" { + t.Fatalf("expected 1 file (1000.json), got %v", files) } // Read back and verify