Restructure metricstore cleanup archiving to stay withinh 32k parquet-go limit

Entire-Checkpoint: 1660b8cf2571
This commit is contained in:
2026-03-23 06:32:24 +01:00
parent 01ec70baa8
commit 586c902044
2 changed files with 16 additions and 5 deletions

View File

@@ -22,6 +22,7 @@ import (
func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
if Keys.Cleanup.Mode == "archive" {
cclog.Info("[METRICSTORE]> enable archive cleanup to parquet")
// Run as Archiver
cleanUpWorker(wg, ctx,
Keys.RetentionInMemory,
@@ -43,7 +44,6 @@ func CleanUp(wg *sync.WaitGroup, ctx context.Context) {
// cleanUpWorker takes simple values to configure what it does
func cleanUpWorker(wg *sync.WaitGroup, ctx context.Context, interval string, mode string, cleanupDir string, delete bool) {
wg.Go(func() {
d, err := time.ParseDuration(interval)
if err != nil {
cclog.Fatalf("[METRICSTORE]> error parsing %s interval duration: %v\n", mode, err)
@@ -275,6 +275,12 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
break
}
}
// Flush once per host to keep row group count within parquet limits.
if writeErr == nil {
if err := writer.FlushRowGroup(); err != nil {
writeErr = err
}
}
}
// Always track files for deletion (even if write failed, we still drain)
toDelete = append(toDelete, deleteItem{dir: r.dir, files: r.files})

View File

@@ -99,7 +99,7 @@ func newParquetArchiveWriter(filename string) (*parquetArchiveWriter, error) {
// WriteCheckpointFile streams a CheckpointFile tree directly to Parquet rows,
// writing metrics in sorted order without materializing all rows in memory.
// Produces one row group per call (typically one host's data).
// Call FlushRowGroup() after writing all checkpoint files for a host.
func (w *parquetArchiveWriter) WriteCheckpointFile(cf *CheckpointFile, cluster, hostname, scope, scopeID string) error {
w.writeLevel(cf, cluster, hostname, scope, scopeID)
@@ -112,10 +112,15 @@ func (w *parquetArchiveWriter) WriteCheckpointFile(cf *CheckpointFile, cluster,
w.batch = w.batch[:0]
}
return nil
}
// FlushRowGroup flushes the current row group to the Parquet file.
// Should be called once per host after all checkpoint files for that host are written.
func (w *parquetArchiveWriter) FlushRowGroup() error {
if err := w.writer.Flush(); err != nil {
return fmt.Errorf("flushing parquet row group: %w", err)
}
return nil
}