From e41d1251ba52de53e18fdf7c8920d0f7b28209cf Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Mon, 23 Mar 2026 06:37:24 +0100 Subject: [PATCH] fix: Continue on error Entire-Checkpoint: 6000eb5a5bb8 --- pkg/metricstore/archive.go | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/pkg/metricstore/archive.go b/pkg/metricstore/archive.go index 2246b0f3..3b92a3e0 100644 --- a/pkg/metricstore/archive.go +++ b/pkg/metricstore/archive.go @@ -11,6 +11,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "sync" "sync/atomic" "time" @@ -181,6 +182,7 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err } totalFiles := 0 + var clusterErrors []string for _, clusterEntry := range clusterEntries { if !clusterEntry.IsDir() { @@ -190,7 +192,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err cluster := clusterEntry.Name() hostEntries, err := os.ReadDir(filepath.Join(checkpointsDir, cluster)) if err != nil { - return totalFiles, err + cclog.Errorf("[METRICSTORE]> error reading host entries for cluster %s: %s", cluster, err.Error()) + clusterErrors = append(clusterErrors, cluster) + continue } // Workers load checkpoint files from disk; main thread writes to parquet. @@ -255,7 +259,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err // Drain results channel to unblock workers for range results { } - return totalFiles, fmt.Errorf("creating parquet writer for cluster %s: %w", cluster, err) + cclog.Errorf("[METRICSTORE]> error creating parquet writer for cluster %s: %s", cluster, err.Error()) + clusterErrors = append(clusterErrors, cluster) + continue } type deleteItem struct { @@ -291,7 +297,10 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err } if errs > 0 { - return totalFiles, fmt.Errorf("%d errors reading checkpoints for cluster %s", errs, cluster) + cclog.Errorf("[METRICSTORE]> %d errors reading checkpoints for cluster %s", errs, cluster) + clusterErrors = append(clusterErrors, cluster) + os.Remove(parquetFile) + continue } if writer.count == 0 { @@ -302,7 +311,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err if writeErr != nil { os.Remove(parquetFile) - return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, writeErr) + cclog.Errorf("[METRICSTORE]> error writing parquet archive for cluster %s: %s", cluster, writeErr.Error()) + clusterErrors = append(clusterErrors, cluster) + continue } // Delete archived checkpoint files @@ -322,5 +333,10 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err } cclog.Infof("[METRICSTORE]> archiving checkpoints completed in %s (%d files)", time.Since(startTime).Round(time.Millisecond), totalFiles) + + if len(clusterErrors) > 0 { + return totalFiles, fmt.Errorf("archiving failed for clusters: %s", strings.Join(clusterErrors, ", ")) + } + return totalFiles, nil }