From 2c8608f5a451f1b12eba91957d53f38ce09c8e2f Mon Sep 17 00:00:00 2001 From: Jan Eitzinger Date: Fri, 13 Feb 2026 12:19:31 +0100 Subject: [PATCH] Update job archive retention to uniform policy with json and parquet target format --- internal/taskmanager/retentionService.go | 410 ++++++++++++++++------- internal/taskmanager/taskManager.go | 43 +-- pkg/archive/ConfigSchema.go | 27 +- pkg/archive/parquet/target.go | 6 +- pkg/archive/parquet/writer.go | 68 ++++ pkg/archive/parquet/writer_test.go | 137 ++++++++ 6 files changed, 533 insertions(+), 158 deletions(-) diff --git a/internal/taskmanager/retentionService.go b/internal/taskmanager/retentionService.go index 453d10bc..d863bb91 100644 --- a/internal/taskmanager/retentionService.go +++ b/internal/taskmanager/retentionService.go @@ -6,157 +6,329 @@ package taskmanager import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" "time" "github.com/ClusterCockpit/cc-backend/pkg/archive" pqarchive "github.com/ClusterCockpit/cc-backend/pkg/archive/parquet" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/schema" "github.com/go-co-op/gocron/v2" ) -func RegisterRetentionDeleteService(age int, includeDB bool, omitTagged bool) { +// createParquetTarget creates a ParquetTarget (file or S3) from the retention config. +func createParquetTarget(cfg Retention) (pqarchive.ParquetTarget, error) { + switch cfg.TargetKind { + case "s3": + return pqarchive.NewS3Target(pqarchive.S3TargetConfig{ + Endpoint: cfg.TargetEndpoint, + Bucket: cfg.TargetBucket, + AccessKey: cfg.TargetAccessKey, + SecretKey: cfg.TargetSecretKey, + Region: cfg.TargetRegion, + UsePathStyle: cfg.TargetUsePathStyle, + }) + default: + return pqarchive.NewFileTarget(cfg.TargetPath) + } +} + +// createTargetBackend creates a secondary archive backend (file or S3) for JSON copy/move. +func createTargetBackend(cfg Retention) (archive.ArchiveBackend, error) { + var raw json.RawMessage + var err error + + switch cfg.TargetKind { + case "s3": + raw, err = json.Marshal(map[string]interface{}{ + "kind": "s3", + "endpoint": cfg.TargetEndpoint, + "bucket": cfg.TargetBucket, + "access-key": cfg.TargetAccessKey, + "secret-key": cfg.TargetSecretKey, + "region": cfg.TargetRegion, + "use-path-style": cfg.TargetUsePathStyle, + }) + default: + raw, err = json.Marshal(map[string]string{ + "kind": "file", + "path": cfg.TargetPath, + }) + } + if err != nil { + return nil, fmt.Errorf("marshal target config: %w", err) + } + return archive.InitBackend(raw) +} + +// transferJobsJSON copies job data from source archive to target backend in JSON format. +func transferJobsJSON(jobs []*schema.Job, src archive.ArchiveBackend, dst archive.ArchiveBackend) error { + // Transfer cluster configs for all clusters referenced by jobs + clustersDone := make(map[string]bool) + for _, job := range jobs { + if clustersDone[job.Cluster] { + continue + } + clusterCfg, err := src.LoadClusterCfg(job.Cluster) + if err != nil { + cclog.Warnf("Retention: load cluster config %q: %v", job.Cluster, err) + } else { + if err := dst.StoreClusterCfg(job.Cluster, clusterCfg); err != nil { + cclog.Warnf("Retention: store cluster config %q: %v", job.Cluster, err) + } + } + clustersDone[job.Cluster] = true + } + + for _, job := range jobs { + meta, err := src.LoadJobMeta(job) + if err != nil { + cclog.Warnf("Retention: load meta for job %d: %v", job.JobID, err) + continue + } + data, err := src.LoadJobData(job) + if err != nil { + cclog.Warnf("Retention: load data for job %d: %v", job.JobID, err) + continue + } + if err := dst.ImportJob(meta, &data); err != nil { + cclog.Warnf("Retention: import job %d: %v", job.JobID, err) + continue + } + } + return nil +} + +// transferJobsParquet converts jobs to Parquet format, organized by cluster. +func transferJobsParquet(jobs []*schema.Job, src archive.ArchiveBackend, target pqarchive.ParquetTarget, maxSizeMB int) error { + cw := pqarchive.NewClusterAwareParquetWriter(target, maxSizeMB) + + // Set cluster configs for all clusters referenced by jobs + clustersDone := make(map[string]bool) + for _, job := range jobs { + if clustersDone[job.Cluster] { + continue + } + clusterCfg, err := src.LoadClusterCfg(job.Cluster) + if err != nil { + cclog.Warnf("Retention: load cluster config %q: %v", job.Cluster, err) + } else { + cw.SetClusterConfig(job.Cluster, clusterCfg) + } + clustersDone[job.Cluster] = true + } + + for _, job := range jobs { + meta, err := src.LoadJobMeta(job) + if err != nil { + cclog.Warnf("Retention: load meta for job %d: %v", job.JobID, err) + continue + } + data, err := src.LoadJobData(job) + if err != nil { + cclog.Warnf("Retention: load data for job %d: %v", job.JobID, err) + continue + } + row, err := pqarchive.JobToParquetRow(meta, &data) + if err != nil { + cclog.Warnf("Retention: convert job %d: %v", job.JobID, err) + continue + } + if err := cw.AddJob(*row); err != nil { + cclog.Errorf("Retention: add job %d to writer: %v", job.JobID, err) + continue + } + } + + return cw.Close() +} + +// cleanupAfterTransfer removes jobs from archive and optionally from DB. +func cleanupAfterTransfer(jobs []*schema.Job, startTime int64, includeDB bool, omitTagged bool) { + archive.GetHandle().CleanUp(jobs) + + if includeDB { + cnt, err := jobRepo.DeleteJobsBefore(startTime, omitTagged) + if err != nil { + cclog.Errorf("Retention: delete jobs from db: %v", err) + } else { + cclog.Infof("Retention: removed %d jobs from db", cnt) + } + if err = jobRepo.Optimize(); err != nil { + cclog.Errorf("Retention: db optimization error: %v", err) + } + } +} + +// readCopyMarker reads the last-processed timestamp from a copy marker file. +func readCopyMarker(cfg Retention) int64 { + var data []byte + var err error + + switch cfg.TargetKind { + case "s3": + // For S3 we store the marker locally alongside the config + data, err = os.ReadFile(copyMarkerPath(cfg)) + default: + data, err = os.ReadFile(filepath.Join(cfg.TargetPath, ".copy-marker")) + } + if err != nil { + return 0 + } + ts, err := strconv.ParseInt(strings.TrimSpace(string(data)), 10, 64) + if err != nil { + return 0 + } + return ts +} + +// writeCopyMarker writes the last-processed timestamp to a copy marker file. +func writeCopyMarker(cfg Retention, ts int64) { + content := []byte(strconv.FormatInt(ts, 10)) + var err error + + switch cfg.TargetKind { + case "s3": + err = os.WriteFile(copyMarkerPath(cfg), content, 0o640) + default: + err = os.WriteFile(filepath.Join(cfg.TargetPath, ".copy-marker"), content, 0o640) + } + if err != nil { + cclog.Warnf("Retention: write copy marker: %v", err) + } +} + +func copyMarkerPath(cfg Retention) string { + // For S3 targets, store the marker in a local temp-style path derived from the bucket name + return filepath.Join(os.TempDir(), fmt.Sprintf("cc-copy-marker-%s", cfg.TargetBucket)) +} + +func RegisterRetentionDeleteService(cfg Retention) { cclog.Info("Register retention delete service") s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(3, 0, 0))), gocron.NewTask( func() { - startTime := time.Now().Unix() - int64(age*24*3600) - jobs, err := jobRepo.FindJobsBetween(0, startTime, omitTagged) + startTime := time.Now().Unix() - int64(cfg.Age*24*3600) + jobs, err := jobRepo.FindJobsBetween(0, startTime, cfg.OmitTagged) if err != nil { - cclog.Warnf("Error while looking for retention jobs: %s", err.Error()) - } - archive.GetHandle().CleanUp(jobs) - - if includeDB { - cnt, err := jobRepo.DeleteJobsBefore(startTime, omitTagged) - if err != nil { - cclog.Errorf("Error while deleting retention jobs from db: %s", err.Error()) - } else { - cclog.Infof("Retention: Removed %d jobs from db", cnt) - } - if err = jobRepo.Optimize(); err != nil { - cclog.Errorf("Error occured in db optimization: %s", err.Error()) - } - } - })) -} - -func RegisterRetentionMoveService(age int, includeDB bool, location string, omitTagged bool) { - cclog.Info("Register retention move service") - - s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(4, 0, 0))), - gocron.NewTask( - func() { - startTime := time.Now().Unix() - int64(age*24*3600) - jobs, err := jobRepo.FindJobsBetween(0, startTime, omitTagged) - if err != nil { - cclog.Warnf("Error while looking for retention jobs: %s", err.Error()) - } - archive.GetHandle().Move(jobs, location) - - if includeDB { - cnt, err := jobRepo.DeleteJobsBefore(startTime, omitTagged) - if err != nil { - cclog.Errorf("Error while deleting retention jobs from db: %v", err) - } else { - cclog.Infof("Retention: Removed %d jobs from db", cnt) - } - if err = jobRepo.Optimize(); err != nil { - cclog.Errorf("Error occured in db optimization: %v", err) - } - } - })) -} - -func RegisterRetentionParquetService(retention Retention) { - cclog.Info("Register retention parquet service") - - maxFileSizeMB := retention.MaxFileSizeMB - if maxFileSizeMB <= 0 { - maxFileSizeMB = 512 - } - - var target pqarchive.ParquetTarget - var err error - - switch retention.TargetKind { - case "s3": - target, err = pqarchive.NewS3Target(pqarchive.S3TargetConfig{ - Endpoint: retention.TargetEndpoint, - Bucket: retention.TargetBucket, - AccessKey: retention.TargetAccessKey, - SecretKey: retention.TargetSecretKey, - Region: retention.TargetRegion, - UsePathStyle: retention.TargetUsePathStyle, - }) - default: - target, err = pqarchive.NewFileTarget(retention.TargetPath) - } - - if err != nil { - cclog.Errorf("Parquet retention: failed to create target: %v", err) - return - } - - s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(5, 0, 0))), - gocron.NewTask( - func() { - startTime := time.Now().Unix() - int64(retention.Age*24*3600) - jobs, err := jobRepo.FindJobsBetween(0, startTime, retention.OmitTagged) - if err != nil { - cclog.Warnf("Parquet retention: error finding jobs: %v", err) + cclog.Warnf("Retention delete: error finding jobs: %v", err) return } if len(jobs) == 0 { return } - cclog.Infof("Parquet retention: processing %d jobs", len(jobs)) - ar := archive.GetHandle() - pw := pqarchive.NewParquetWriter(target, maxFileSizeMB) + cclog.Infof("Retention delete: processing %d jobs", len(jobs)) + cleanupAfterTransfer(jobs, startTime, cfg.IncludeDB, cfg.OmitTagged) + })) +} - for _, job := range jobs { - meta, err := ar.LoadJobMeta(job) - if err != nil { - cclog.Warnf("Parquet retention: load meta for job %d: %v", job.JobID, err) - continue - } +func RegisterRetentionCopyService(cfg Retention) { + cclog.Infof("Register retention copy service (format=%s, target=%s)", cfg.Format, cfg.TargetKind) - data, err := ar.LoadJobData(job) - if err != nil { - cclog.Warnf("Parquet retention: load data for job %d: %v", job.JobID, err) - continue - } + maxFileSizeMB := cfg.MaxFileSizeMB + if maxFileSizeMB <= 0 { + maxFileSizeMB = 512 + } - row, err := pqarchive.JobToParquetRow(meta, &data) - if err != nil { - cclog.Warnf("Parquet retention: convert job %d: %v", job.JobID, err) - continue - } + s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(4, 0, 0))), + gocron.NewTask( + func() { + cutoff := time.Now().Unix() - int64(cfg.Age*24*3600) + lastProcessed := readCopyMarker(cfg) - if err := pw.AddJob(*row); err != nil { - cclog.Errorf("Parquet retention: add job %d to writer: %v", job.JobID, err) - continue - } + jobs, err := jobRepo.FindJobsBetween(lastProcessed, cutoff, cfg.OmitTagged) + if err != nil { + cclog.Warnf("Retention copy: error finding jobs: %v", err) + return } - - if err := pw.Close(); err != nil { - cclog.Errorf("Parquet retention: close writer: %v", err) + if len(jobs) == 0 { return } - ar.CleanUp(jobs) + cclog.Infof("Retention copy: processing %d jobs", len(jobs)) + ar := archive.GetHandle() - if retention.IncludeDB { - cnt, err := jobRepo.DeleteJobsBefore(startTime, retention.OmitTagged) + switch cfg.Format { + case "parquet": + target, err := createParquetTarget(cfg) if err != nil { - cclog.Errorf("Parquet retention: delete jobs from db: %v", err) - } else { - cclog.Infof("Parquet retention: removed %d jobs from db", cnt) + cclog.Errorf("Retention copy: create parquet target: %v", err) + return } - if err = jobRepo.Optimize(); err != nil { - cclog.Errorf("Parquet retention: db optimization error: %v", err) + if err := transferJobsParquet(jobs, ar, target, maxFileSizeMB); err != nil { + cclog.Errorf("Retention copy: parquet transfer: %v", err) + return + } + default: // json + dst, err := createTargetBackend(cfg) + if err != nil { + cclog.Errorf("Retention copy: create target backend: %v", err) + return + } + if err := transferJobsJSON(jobs, ar, dst); err != nil { + cclog.Errorf("Retention copy: json transfer: %v", err) + return } } + + writeCopyMarker(cfg, cutoff) + })) +} + +func RegisterRetentionMoveService(cfg Retention) { + cclog.Infof("Register retention move service (format=%s, target=%s)", cfg.Format, cfg.TargetKind) + + maxFileSizeMB := cfg.MaxFileSizeMB + if maxFileSizeMB <= 0 { + maxFileSizeMB = 512 + } + + s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(5, 0, 0))), + gocron.NewTask( + func() { + startTime := time.Now().Unix() - int64(cfg.Age*24*3600) + jobs, err := jobRepo.FindJobsBetween(0, startTime, cfg.OmitTagged) + if err != nil { + cclog.Warnf("Retention move: error finding jobs: %v", err) + return + } + if len(jobs) == 0 { + return + } + + cclog.Infof("Retention move: processing %d jobs", len(jobs)) + ar := archive.GetHandle() + + switch cfg.Format { + case "parquet": + target, err := createParquetTarget(cfg) + if err != nil { + cclog.Errorf("Retention move: create parquet target: %v", err) + return + } + if err := transferJobsParquet(jobs, ar, target, maxFileSizeMB); err != nil { + cclog.Errorf("Retention move: parquet transfer: %v", err) + return + } + default: // json + dst, err := createTargetBackend(cfg) + if err != nil { + cclog.Errorf("Retention move: create target backend: %v", err) + return + } + if err := transferJobsJSON(jobs, ar, dst); err != nil { + cclog.Errorf("Retention move: json transfer: %v", err) + return + } + } + + cleanupAfterTransfer(jobs, startTime, cfg.IncludeDB, cfg.OmitTagged) })) } diff --git a/internal/taskmanager/taskManager.go b/internal/taskmanager/taskManager.go index 8cf6b4e6..529395b5 100644 --- a/internal/taskmanager/taskManager.go +++ b/internal/taskmanager/taskManager.go @@ -23,20 +23,20 @@ const ( // Retention defines the configuration for job retention policies. type Retention struct { - Policy string `json:"policy"` - Location string `json:"location"` - Age int `json:"age"` - IncludeDB bool `json:"includeDB"` - OmitTagged bool `json:"omitTagged"` - TargetKind string `json:"target-kind"` - TargetPath string `json:"target-path"` - TargetEndpoint string `json:"target-endpoint"` - TargetBucket string `json:"target-bucket"` - TargetAccessKey string `json:"target-access-key"` - TargetSecretKey string `json:"target-secret-key"` - TargetRegion string `json:"target-region"` - TargetUsePathStyle bool `json:"target-use-path-style"` - MaxFileSizeMB int `json:"max-file-size-mb"` + Policy string `json:"policy"` + Format string `json:"format"` + Age int `json:"age"` + IncludeDB bool `json:"includeDB"` + OmitTagged bool `json:"omitTagged"` + TargetKind string `json:"target-kind"` + TargetPath string `json:"target-path"` + TargetEndpoint string `json:"target-endpoint"` + TargetBucket string `json:"target-bucket"` + TargetAccessKey string `json:"target-access-key"` + TargetSecretKey string `json:"target-secret-key"` + TargetRegion string `json:"target-region"` + TargetUsePathStyle bool `json:"target-use-path-style"` + MaxFileSizeMB int `json:"max-file-size-mb"` } // CronFrequency defines the execution intervals for various background workers. @@ -86,18 +86,11 @@ func initArchiveServices(config json.RawMessage) { switch cfg.Retention.Policy { case "delete": - RegisterRetentionDeleteService( - cfg.Retention.Age, - cfg.Retention.IncludeDB, - cfg.Retention.OmitTagged) + RegisterRetentionDeleteService(cfg.Retention) + case "copy": + RegisterRetentionCopyService(cfg.Retention) case "move": - RegisterRetentionMoveService( - cfg.Retention.Age, - cfg.Retention.IncludeDB, - cfg.Retention.Location, - cfg.Retention.OmitTagged) - case "parquet": - RegisterRetentionParquetService(cfg.Retention) + RegisterRetentionMoveService(cfg.Retention) } if cfg.Compression > 0 { diff --git a/pkg/archive/ConfigSchema.go b/pkg/archive/ConfigSchema.go index db568200..cb9b16bc 100644 --- a/pkg/archive/ConfigSchema.go +++ b/pkg/archive/ConfigSchema.go @@ -57,7 +57,12 @@ var configSchema = ` "policy": { "description": "Retention policy", "type": "string", - "enum": ["none", "delete", "move", "parquet"] + "enum": ["none", "delete", "copy", "move"] + }, + "format": { + "description": "Output format for copy/move policies", + "type": "string", + "enum": ["json", "parquet"] }, "include-db": { "description": "Also remove jobs from database", @@ -67,41 +72,37 @@ var configSchema = ` "description": "Act on jobs with startTime older than age (in days)", "type": "integer" }, - "location": { - "description": "The target directory for retention. Only applicable for retention move.", - "type": "string" - }, "target-kind": { - "description": "Target storage kind for parquet retention: file or s3", + "description": "Target storage kind: file or s3", "type": "string", "enum": ["file", "s3"] }, "target-path": { - "description": "Target directory path for parquet file storage", + "description": "Target directory path for file storage", "type": "string" }, "target-endpoint": { - "description": "S3 endpoint URL for parquet target", + "description": "S3 endpoint URL for target", "type": "string" }, "target-bucket": { - "description": "S3 bucket name for parquet target", + "description": "S3 bucket name for target", "type": "string" }, "target-access-key": { - "description": "S3 access key for parquet target", + "description": "S3 access key for target", "type": "string" }, "target-secret-key": { - "description": "S3 secret key for parquet target", + "description": "S3 secret key for target", "type": "string" }, "target-region": { - "description": "S3 region for parquet target", + "description": "S3 region for target", "type": "string" }, "target-use-path-style": { - "description": "Use path-style S3 URLs for parquet target", + "description": "Use path-style S3 URLs for target", "type": "boolean" }, "max-file-size-mb": { diff --git a/pkg/archive/parquet/target.go b/pkg/archive/parquet/target.go index 0e8babc2..090a230d 100644 --- a/pkg/archive/parquet/target.go +++ b/pkg/archive/parquet/target.go @@ -36,7 +36,11 @@ func NewFileTarget(path string) (*FileTarget, error) { } func (ft *FileTarget) WriteFile(name string, data []byte) error { - return os.WriteFile(filepath.Join(ft.path, name), data, 0o640) + fullPath := filepath.Join(ft.path, name) + if err := os.MkdirAll(filepath.Dir(fullPath), 0o750); err != nil { + return fmt.Errorf("create parent directory: %w", err) + } + return os.WriteFile(fullPath, data, 0o640) } // S3TargetConfig holds the configuration for an S3 parquet target. diff --git a/pkg/archive/parquet/writer.go b/pkg/archive/parquet/writer.go index ab56cace..2669a9c8 100644 --- a/pkg/archive/parquet/writer.go +++ b/pkg/archive/parquet/writer.go @@ -7,10 +7,13 @@ package parquet import ( "bytes" + "encoding/json" "fmt" + "path" "time" cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger" + "github.com/ClusterCockpit/cc-lib/v2/schema" pq "github.com/parquet-go/parquet-go" ) @@ -111,3 +114,68 @@ func estimateRowSize(row *ParquetJobRow) int64 { size += int64(len(row.MetricDataGz)) return size } + +// prefixedTarget wraps a ParquetTarget and prepends a path prefix to all file names. +type prefixedTarget struct { + inner ParquetTarget + prefix string +} + +func (pt *prefixedTarget) WriteFile(name string, data []byte) error { + return pt.inner.WriteFile(path.Join(pt.prefix, name), data) +} + +// ClusterAwareParquetWriter organizes Parquet output by cluster. +// Each cluster gets its own subdirectory with a cluster.json config file. +type ClusterAwareParquetWriter struct { + target ParquetTarget + maxSizeMB int + writers map[string]*ParquetWriter + clusterCfgs map[string]*schema.Cluster +} + +// NewClusterAwareParquetWriter creates a writer that routes jobs to per-cluster ParquetWriters. +func NewClusterAwareParquetWriter(target ParquetTarget, maxSizeMB int) *ClusterAwareParquetWriter { + return &ClusterAwareParquetWriter{ + target: target, + maxSizeMB: maxSizeMB, + writers: make(map[string]*ParquetWriter), + clusterCfgs: make(map[string]*schema.Cluster), + } +} + +// SetClusterConfig stores a cluster configuration to be written as cluster.json on Close. +func (cw *ClusterAwareParquetWriter) SetClusterConfig(name string, cfg *schema.Cluster) { + cw.clusterCfgs[name] = cfg +} + +// AddJob routes the job row to the appropriate per-cluster writer. +func (cw *ClusterAwareParquetWriter) AddJob(row ParquetJobRow) error { + cluster := row.Cluster + pw, ok := cw.writers[cluster] + if !ok { + pw = NewParquetWriter(&prefixedTarget{inner: cw.target, prefix: cluster}, cw.maxSizeMB) + cw.writers[cluster] = pw + } + return pw.AddJob(row) +} + +// Close writes cluster.json files and flushes all per-cluster writers. +func (cw *ClusterAwareParquetWriter) Close() error { + for name, cfg := range cw.clusterCfgs { + data, err := json.MarshalIndent(cfg, "", " ") + if err != nil { + return fmt.Errorf("marshal cluster config %q: %w", name, err) + } + if err := cw.target.WriteFile(path.Join(name, "cluster.json"), data); err != nil { + return fmt.Errorf("write cluster.json for %q: %w", name, err) + } + } + + for cluster, pw := range cw.writers { + if err := pw.Close(); err != nil { + return fmt.Errorf("close writer for cluster %q: %w", cluster, err) + } + } + return nil +} diff --git a/pkg/archive/parquet/writer_test.go b/pkg/archive/parquet/writer_test.go index e532e472..57b4ca4c 100644 --- a/pkg/archive/parquet/writer_test.go +++ b/pkg/archive/parquet/writer_test.go @@ -10,6 +10,9 @@ import ( "compress/gzip" "encoding/json" "io" + "os" + "path/filepath" + "strings" "sync" "testing" @@ -222,3 +225,137 @@ func TestFileTarget(t *testing.T) { // Verify file exists and has correct content // (using the target itself is sufficient; we just check no error) } + +func TestFileTargetSubdirectories(t *testing.T) { + dir := t.TempDir() + ft, err := NewFileTarget(dir) + if err != nil { + t.Fatalf("NewFileTarget: %v", err) + } + + testData := []byte("test data in subdir") + if err := ft.WriteFile("fritz/cc-archive-2025-01-20-001.parquet", testData); err != nil { + t.Fatalf("WriteFile with subdir: %v", err) + } + + // Verify file was created in subdirectory + content, err := os.ReadFile(filepath.Join(dir, "fritz", "cc-archive-2025-01-20-001.parquet")) + if err != nil { + t.Fatalf("read file in subdir: %v", err) + } + if !bytes.Equal(content, testData) { + t.Error("file content mismatch") + } +} + +func makeTestJobForCluster(jobID int64, cluster string) (*schema.Job, *schema.JobData) { + meta, data := makeTestJob(jobID) + meta.Cluster = cluster + return meta, data +} + +func TestClusterAwareParquetWriter(t *testing.T) { + target := newMemTarget() + cw := NewClusterAwareParquetWriter(target, 512) + + // Set cluster configs + cw.SetClusterConfig("fritz", &schema.Cluster{Name: "fritz"}) + cw.SetClusterConfig("alex", &schema.Cluster{Name: "alex"}) + + // Add jobs from different clusters + for i := int64(0); i < 3; i++ { + meta, data := makeTestJobForCluster(i, "fritz") + row, err := JobToParquetRow(meta, data) + if err != nil { + t.Fatalf("convert fritz job %d: %v", i, err) + } + if err := cw.AddJob(*row); err != nil { + t.Fatalf("add fritz job %d: %v", i, err) + } + } + + for i := int64(10); i < 12; i++ { + meta, data := makeTestJobForCluster(i, "alex") + row, err := JobToParquetRow(meta, data) + if err != nil { + t.Fatalf("convert alex job %d: %v", i, err) + } + if err := cw.AddJob(*row); err != nil { + t.Fatalf("add alex job %d: %v", i, err) + } + } + + if err := cw.Close(); err != nil { + t.Fatalf("close: %v", err) + } + + target.mu.Lock() + defer target.mu.Unlock() + + // Check cluster.json files were written + if _, ok := target.files["fritz/cluster.json"]; !ok { + t.Error("missing fritz/cluster.json") + } + if _, ok := target.files["alex/cluster.json"]; !ok { + t.Error("missing alex/cluster.json") + } + + // Verify cluster.json content + var clusterCfg schema.Cluster + if err := json.Unmarshal(target.files["fritz/cluster.json"], &clusterCfg); err != nil { + t.Fatalf("unmarshal fritz cluster.json: %v", err) + } + if clusterCfg.Name != "fritz" { + t.Errorf("fritz cluster name = %q, want %q", clusterCfg.Name, "fritz") + } + + // Check parquet files are in cluster subdirectories + fritzParquets := 0 + alexParquets := 0 + for name := range target.files { + if strings.HasPrefix(name, "fritz/") && strings.HasSuffix(name, ".parquet") { + fritzParquets++ + } + if strings.HasPrefix(name, "alex/") && strings.HasSuffix(name, ".parquet") { + alexParquets++ + } + } + if fritzParquets == 0 { + t.Error("no parquet files in fritz/") + } + if alexParquets == 0 { + t.Error("no parquet files in alex/") + } + + // Verify parquet files are readable and have correct row counts + for name, data := range target.files { + if !strings.HasSuffix(name, ".parquet") { + continue + } + file := bytes.NewReader(data) + pf, err := pq.OpenFile(file, int64(len(data))) + if err != nil { + t.Errorf("open parquet %s: %v", name, err) + continue + } + if strings.HasPrefix(name, "fritz/") && pf.NumRows() != 3 { + t.Errorf("fritz parquet rows = %d, want 3", pf.NumRows()) + } + if strings.HasPrefix(name, "alex/") && pf.NumRows() != 2 { + t.Errorf("alex parquet rows = %d, want 2", pf.NumRows()) + } + } +} + +func TestClusterAwareParquetWriterEmpty(t *testing.T) { + target := newMemTarget() + cw := NewClusterAwareParquetWriter(target, 512) + + if err := cw.Close(); err != nil { + t.Fatalf("close empty writer: %v", err) + } + + if len(target.files) != 0 { + t.Errorf("expected no files for empty writer, got %d", len(target.files)) + } +}