fix: Continue on error

Entire-Checkpoint: 6000eb5a5bb8
This commit is contained in:
2026-03-23 06:37:24 +01:00
parent 586c902044
commit e41d1251ba

View File

@@ -11,6 +11,7 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -181,6 +182,7 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
} }
totalFiles := 0 totalFiles := 0
var clusterErrors []string
for _, clusterEntry := range clusterEntries { for _, clusterEntry := range clusterEntries {
if !clusterEntry.IsDir() { if !clusterEntry.IsDir() {
@@ -190,7 +192,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
cluster := clusterEntry.Name() cluster := clusterEntry.Name()
hostEntries, err := os.ReadDir(filepath.Join(checkpointsDir, cluster)) hostEntries, err := os.ReadDir(filepath.Join(checkpointsDir, cluster))
if err != nil { if err != nil {
return totalFiles, err cclog.Errorf("[METRICSTORE]> error reading host entries for cluster %s: %s", cluster, err.Error())
clusterErrors = append(clusterErrors, cluster)
continue
} }
// Workers load checkpoint files from disk; main thread writes to parquet. // Workers load checkpoint files from disk; main thread writes to parquet.
@@ -255,7 +259,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
// Drain results channel to unblock workers // Drain results channel to unblock workers
for range results { for range results {
} }
return totalFiles, fmt.Errorf("creating parquet writer for cluster %s: %w", cluster, err) cclog.Errorf("[METRICSTORE]> error creating parquet writer for cluster %s: %s", cluster, err.Error())
clusterErrors = append(clusterErrors, cluster)
continue
} }
type deleteItem struct { type deleteItem struct {
@@ -291,7 +297,10 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
} }
if errs > 0 { if errs > 0 {
return totalFiles, fmt.Errorf("%d errors reading checkpoints for cluster %s", errs, cluster) cclog.Errorf("[METRICSTORE]> %d errors reading checkpoints for cluster %s", errs, cluster)
clusterErrors = append(clusterErrors, cluster)
os.Remove(parquetFile)
continue
} }
if writer.count == 0 { if writer.count == 0 {
@@ -302,7 +311,9 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
if writeErr != nil { if writeErr != nil {
os.Remove(parquetFile) os.Remove(parquetFile)
return totalFiles, fmt.Errorf("writing parquet archive for cluster %s: %w", cluster, writeErr) cclog.Errorf("[METRICSTORE]> error writing parquet archive for cluster %s: %s", cluster, writeErr.Error())
clusterErrors = append(clusterErrors, cluster)
continue
} }
// Delete archived checkpoint files // Delete archived checkpoint files
@@ -322,5 +333,10 @@ func archiveCheckpoints(checkpointsDir, cleanupDir string, from int64) (int, err
} }
cclog.Infof("[METRICSTORE]> archiving checkpoints completed in %s (%d files)", time.Since(startTime).Round(time.Millisecond), totalFiles) cclog.Infof("[METRICSTORE]> archiving checkpoints completed in %s (%d files)", time.Since(startTime).Round(time.Millisecond), totalFiles)
if len(clusterErrors) > 0 {
return totalFiles, fmt.Errorf("archiving failed for clusters: %s", strings.Join(clusterErrors, ", "))
}
return totalFiles, nil return totalFiles, nil
} }