From 586c902044161182617584be7bd0f1a31824853b Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 23 Mar 2026 06:32:24 +0100 Subject: [PATCH] Restructure metricstore cleanup archiving to stay withinh 32k parquet-go limit Entire-Checkpoint: 1660b8cf2571 --- pkg/metricstore/archive.go | 12 +++++++++--- pkg/metricstore/parquetArchive.go | 9 +++++++-- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index 7eb1b72f..2246b0f3 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -22,6 +22,7 @@ import ( func CleanUp(wg *sync.WaitGroup, ctx context.Context) { if Keys.Cleanup.Mode == "archive" { + cclog.Info("[METRICSTORE]> enable archive cleanup to parquet") // Run as Archiver cleanUpWorker(wg, ctx, Keys.RetentionInMemory, @@ -43,7 +44,6 @@ func CleanUp(wg *sync.WaitGroup, ctx context.Context) { // cleanUpWorker takes simple values to configure what it does func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) { wg.Go(func() { - d, err := time.ParseDuration(interval) if err != nil { cclog.Fatalf("[METRICSTORE]> error parsing %s interval duration: %v\n", mode, err) @@ -99,8 +99,8 @@ func deleteCheckpoints(checkpointsDir string, from int64) (int, error) { } type workItem struct { - dir string - cluster, host string + dir string + cluster, host string } var wg sync.WaitGroup @@ -275,6 +275,12 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err break } } + // Flush once per host to keep row group count within parquet limits. + if writeErr == nil { + if err := writer.FlushRowGroup(); err != nil { + writeErr = err + } + } } // Always track files for deletion (even if write failed, we still drain) toDelete = append(toDelete, deleteItem{dir: r.dir, files: r.files}) diff --git a/pkg/metricstore/parquetArchive.go b/pkg/metricstore/parquetArchive.go index 260bd8dd..81c8be5a 100644 --- a/pkg/metricstore/parquetArchive.go +++ b/pkg/metricstore/parquetArchive.go @@ -99,7 +99,7 @@ func newParquetArchiveWriter(filename string) (*parquetArchiveWriter, error) { // WriteCheckpointFile streams a CheckpointFile tree directly to Parquet rows, // writing metrics in sorted order without materializing all rows in memory. -// Produces one row group per call (typically one host's data). +// Call FlushRowGroup() after writing all checkpoint files for a host. func (w *parquetArchiveWriter) WriteCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string) error { w.writeLevel(cf, cluster, hostname, scope, scopeID) @@ -112,10 +112,15 @@ func (w *parquetArchiveWriter) WriteCheckpointFile(cf *CheckpointFile, cluster, w.batch = w.batch[:0] } + return nil +} + +// FlushRowGroup flushes the current row group to the Parquet file. +// Should be called once per host after all checkpoint files for that host are written. +func (w *parquetArchiveWriter) FlushRowGroup() error { if err := w.writer.Flush(); err != nil { return fmt.Errorf("flushing parquet row group: %w", err) } - return nil }