mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2026-02-17 16:31:45 +01:00
Update job archive retention to uniform policy with json and parquet target format
This commit is contained in:
@@ -6,157 +6,329 @@
|
||||
package taskmanager
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/pkg/archive"
|
||||
pqarchive "github.com/ClusterCockpit/cc-backend/pkg/archive/parquet"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/v2/schema"
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
)
|
||||
|
||||
func RegisterRetentionDeleteService(age int, includeDB bool, omitTagged bool) {
|
||||
// createParquetTarget creates a ParquetTarget (file or S3) from the retention config.
|
||||
func createParquetTarget(cfg Retention) (pqarchive.ParquetTarget, error) {
|
||||
switch cfg.TargetKind {
|
||||
case "s3":
|
||||
return pqarchive.NewS3Target(pqarchive.S3TargetConfig{
|
||||
Endpoint: cfg.TargetEndpoint,
|
||||
Bucket: cfg.TargetBucket,
|
||||
AccessKey: cfg.TargetAccessKey,
|
||||
SecretKey: cfg.TargetSecretKey,
|
||||
Region: cfg.TargetRegion,
|
||||
UsePathStyle: cfg.TargetUsePathStyle,
|
||||
})
|
||||
default:
|
||||
return pqarchive.NewFileTarget(cfg.TargetPath)
|
||||
}
|
||||
}
|
||||
|
||||
// createTargetBackend creates a secondary archive backend (file or S3) for JSON copy/move.
|
||||
func createTargetBackend(cfg Retention) (archive.ArchiveBackend, error) {
|
||||
var raw json.RawMessage
|
||||
var err error
|
||||
|
||||
switch cfg.TargetKind {
|
||||
case "s3":
|
||||
raw, err = json.Marshal(map[string]interface{}{
|
||||
"kind": "s3",
|
||||
"endpoint": cfg.TargetEndpoint,
|
||||
"bucket": cfg.TargetBucket,
|
||||
"access-key": cfg.TargetAccessKey,
|
||||
"secret-key": cfg.TargetSecretKey,
|
||||
"region": cfg.TargetRegion,
|
||||
"use-path-style": cfg.TargetUsePathStyle,
|
||||
})
|
||||
default:
|
||||
raw, err = json.Marshal(map[string]string{
|
||||
"kind": "file",
|
||||
"path": cfg.TargetPath,
|
||||
})
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal target config: %w", err)
|
||||
}
|
||||
return archive.InitBackend(raw)
|
||||
}
|
||||
|
||||
// transferJobsJSON copies job data from source archive to target backend in JSON format.
|
||||
func transferJobsJSON(jobs []*schema.Job, src archive.ArchiveBackend, dst archive.ArchiveBackend) error {
|
||||
// Transfer cluster configs for all clusters referenced by jobs
|
||||
clustersDone := make(map[string]bool)
|
||||
for _, job := range jobs {
|
||||
if clustersDone[job.Cluster] {
|
||||
continue
|
||||
}
|
||||
clusterCfg, err := src.LoadClusterCfg(job.Cluster)
|
||||
if err != nil {
|
||||
cclog.Warnf("Retention: load cluster config %q: %v", job.Cluster, err)
|
||||
} else {
|
||||
if err := dst.StoreClusterCfg(job.Cluster, clusterCfg); err != nil {
|
||||
cclog.Warnf("Retention: store cluster config %q: %v", job.Cluster, err)
|
||||
}
|
||||
}
|
||||
clustersDone[job.Cluster] = true
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
meta, err := src.LoadJobMeta(job)
|
||||
if err != nil {
|
||||
cclog.Warnf("Retention: load meta for job %d: %v", job.JobID, err)
|
||||
continue
|
||||
}
|
||||
data, err := src.LoadJobData(job)
|
||||
if err != nil {
|
||||
cclog.Warnf("Retention: load data for job %d: %v", job.JobID, err)
|
||||
continue
|
||||
}
|
||||
if err := dst.ImportJob(meta, &data); err != nil {
|
||||
cclog.Warnf("Retention: import job %d: %v", job.JobID, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// transferJobsParquet converts jobs to Parquet format, organized by cluster.
|
||||
func transferJobsParquet(jobs []*schema.Job, src archive.ArchiveBackend, target pqarchive.ParquetTarget, maxSizeMB int) error {
|
||||
cw := pqarchive.NewClusterAwareParquetWriter(target, maxSizeMB)
|
||||
|
||||
// Set cluster configs for all clusters referenced by jobs
|
||||
clustersDone := make(map[string]bool)
|
||||
for _, job := range jobs {
|
||||
if clustersDone[job.Cluster] {
|
||||
continue
|
||||
}
|
||||
clusterCfg, err := src.LoadClusterCfg(job.Cluster)
|
||||
if err != nil {
|
||||
cclog.Warnf("Retention: load cluster config %q: %v", job.Cluster, err)
|
||||
} else {
|
||||
cw.SetClusterConfig(job.Cluster, clusterCfg)
|
||||
}
|
||||
clustersDone[job.Cluster] = true
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
meta, err := src.LoadJobMeta(job)
|
||||
if err != nil {
|
||||
cclog.Warnf("Retention: load meta for job %d: %v", job.JobID, err)
|
||||
continue
|
||||
}
|
||||
data, err := src.LoadJobData(job)
|
||||
if err != nil {
|
||||
cclog.Warnf("Retention: load data for job %d: %v", job.JobID, err)
|
||||
continue
|
||||
}
|
||||
row, err := pqarchive.JobToParquetRow(meta, &data)
|
||||
if err != nil {
|
||||
cclog.Warnf("Retention: convert job %d: %v", job.JobID, err)
|
||||
continue
|
||||
}
|
||||
if err := cw.AddJob(*row); err != nil {
|
||||
cclog.Errorf("Retention: add job %d to writer: %v", job.JobID, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return cw.Close()
|
||||
}
|
||||
|
||||
// cleanupAfterTransfer removes jobs from archive and optionally from DB.
|
||||
func cleanupAfterTransfer(jobs []*schema.Job, startTime int64, includeDB bool, omitTagged bool) {
|
||||
archive.GetHandle().CleanUp(jobs)
|
||||
|
||||
if includeDB {
|
||||
cnt, err := jobRepo.DeleteJobsBefore(startTime, omitTagged)
|
||||
if err != nil {
|
||||
cclog.Errorf("Retention: delete jobs from db: %v", err)
|
||||
} else {
|
||||
cclog.Infof("Retention: removed %d jobs from db", cnt)
|
||||
}
|
||||
if err = jobRepo.Optimize(); err != nil {
|
||||
cclog.Errorf("Retention: db optimization error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// readCopyMarker reads the last-processed timestamp from a copy marker file.
|
||||
func readCopyMarker(cfg Retention) int64 {
|
||||
var data []byte
|
||||
var err error
|
||||
|
||||
switch cfg.TargetKind {
|
||||
case "s3":
|
||||
// For S3 we store the marker locally alongside the config
|
||||
data, err = os.ReadFile(copyMarkerPath(cfg))
|
||||
default:
|
||||
data, err = os.ReadFile(filepath.Join(cfg.TargetPath, ".copy-marker"))
|
||||
}
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
ts, err := strconv.ParseInt(strings.TrimSpace(string(data)), 10, 64)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return ts
|
||||
}
|
||||
|
||||
// writeCopyMarker writes the last-processed timestamp to a copy marker file.
|
||||
func writeCopyMarker(cfg Retention, ts int64) {
|
||||
content := []byte(strconv.FormatInt(ts, 10))
|
||||
var err error
|
||||
|
||||
switch cfg.TargetKind {
|
||||
case "s3":
|
||||
err = os.WriteFile(copyMarkerPath(cfg), content, 0o640)
|
||||
default:
|
||||
err = os.WriteFile(filepath.Join(cfg.TargetPath, ".copy-marker"), content, 0o640)
|
||||
}
|
||||
if err != nil {
|
||||
cclog.Warnf("Retention: write copy marker: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func copyMarkerPath(cfg Retention) string {
|
||||
// For S3 targets, store the marker in a local temp-style path derived from the bucket name
|
||||
return filepath.Join(os.TempDir(), fmt.Sprintf("cc-copy-marker-%s", cfg.TargetBucket))
|
||||
}
|
||||
|
||||
func RegisterRetentionDeleteService(cfg Retention) {
|
||||
cclog.Info("Register retention delete service")
|
||||
|
||||
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(3, 0, 0))),
|
||||
gocron.NewTask(
|
||||
func() {
|
||||
startTime := time.Now().Unix() - int64(age*24*3600)
|
||||
jobs, err := jobRepo.FindJobsBetween(0, startTime, omitTagged)
|
||||
startTime := time.Now().Unix() - int64(cfg.Age*24*3600)
|
||||
jobs, err := jobRepo.FindJobsBetween(0, startTime, cfg.OmitTagged)
|
||||
if err != nil {
|
||||
cclog.Warnf("Error while looking for retention jobs: %s", err.Error())
|
||||
}
|
||||
archive.GetHandle().CleanUp(jobs)
|
||||
|
||||
if includeDB {
|
||||
cnt, err := jobRepo.DeleteJobsBefore(startTime, omitTagged)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while deleting retention jobs from db: %s", err.Error())
|
||||
} else {
|
||||
cclog.Infof("Retention: Removed %d jobs from db", cnt)
|
||||
}
|
||||
if err = jobRepo.Optimize(); err != nil {
|
||||
cclog.Errorf("Error occured in db optimization: %s", err.Error())
|
||||
}
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
func RegisterRetentionMoveService(age int, includeDB bool, location string, omitTagged bool) {
|
||||
cclog.Info("Register retention move service")
|
||||
|
||||
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(4, 0, 0))),
|
||||
gocron.NewTask(
|
||||
func() {
|
||||
startTime := time.Now().Unix() - int64(age*24*3600)
|
||||
jobs, err := jobRepo.FindJobsBetween(0, startTime, omitTagged)
|
||||
if err != nil {
|
||||
cclog.Warnf("Error while looking for retention jobs: %s", err.Error())
|
||||
}
|
||||
archive.GetHandle().Move(jobs, location)
|
||||
|
||||
if includeDB {
|
||||
cnt, err := jobRepo.DeleteJobsBefore(startTime, omitTagged)
|
||||
if err != nil {
|
||||
cclog.Errorf("Error while deleting retention jobs from db: %v", err)
|
||||
} else {
|
||||
cclog.Infof("Retention: Removed %d jobs from db", cnt)
|
||||
}
|
||||
if err = jobRepo.Optimize(); err != nil {
|
||||
cclog.Errorf("Error occured in db optimization: %v", err)
|
||||
}
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
func RegisterRetentionParquetService(retention Retention) {
|
||||
cclog.Info("Register retention parquet service")
|
||||
|
||||
maxFileSizeMB := retention.MaxFileSizeMB
|
||||
if maxFileSizeMB <= 0 {
|
||||
maxFileSizeMB = 512
|
||||
}
|
||||
|
||||
var target pqarchive.ParquetTarget
|
||||
var err error
|
||||
|
||||
switch retention.TargetKind {
|
||||
case "s3":
|
||||
target, err = pqarchive.NewS3Target(pqarchive.S3TargetConfig{
|
||||
Endpoint: retention.TargetEndpoint,
|
||||
Bucket: retention.TargetBucket,
|
||||
AccessKey: retention.TargetAccessKey,
|
||||
SecretKey: retention.TargetSecretKey,
|
||||
Region: retention.TargetRegion,
|
||||
UsePathStyle: retention.TargetUsePathStyle,
|
||||
})
|
||||
default:
|
||||
target, err = pqarchive.NewFileTarget(retention.TargetPath)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
cclog.Errorf("Parquet retention: failed to create target: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(5, 0, 0))),
|
||||
gocron.NewTask(
|
||||
func() {
|
||||
startTime := time.Now().Unix() - int64(retention.Age*24*3600)
|
||||
jobs, err := jobRepo.FindJobsBetween(0, startTime, retention.OmitTagged)
|
||||
if err != nil {
|
||||
cclog.Warnf("Parquet retention: error finding jobs: %v", err)
|
||||
cclog.Warnf("Retention delete: error finding jobs: %v", err)
|
||||
return
|
||||
}
|
||||
if len(jobs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
cclog.Infof("Parquet retention: processing %d jobs", len(jobs))
|
||||
ar := archive.GetHandle()
|
||||
pw := pqarchive.NewParquetWriter(target, maxFileSizeMB)
|
||||
cclog.Infof("Retention delete: processing %d jobs", len(jobs))
|
||||
cleanupAfterTransfer(jobs, startTime, cfg.IncludeDB, cfg.OmitTagged)
|
||||
}))
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
meta, err := ar.LoadJobMeta(job)
|
||||
if err != nil {
|
||||
cclog.Warnf("Parquet retention: load meta for job %d: %v", job.JobID, err)
|
||||
continue
|
||||
}
|
||||
func RegisterRetentionCopyService(cfg Retention) {
|
||||
cclog.Infof("Register retention copy service (format=%s, target=%s)", cfg.Format, cfg.TargetKind)
|
||||
|
||||
data, err := ar.LoadJobData(job)
|
||||
if err != nil {
|
||||
cclog.Warnf("Parquet retention: load data for job %d: %v", job.JobID, err)
|
||||
continue
|
||||
}
|
||||
maxFileSizeMB := cfg.MaxFileSizeMB
|
||||
if maxFileSizeMB <= 0 {
|
||||
maxFileSizeMB = 512
|
||||
}
|
||||
|
||||
row, err := pqarchive.JobToParquetRow(meta, &data)
|
||||
if err != nil {
|
||||
cclog.Warnf("Parquet retention: convert job %d: %v", job.JobID, err)
|
||||
continue
|
||||
}
|
||||
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(4, 0, 0))),
|
||||
gocron.NewTask(
|
||||
func() {
|
||||
cutoff := time.Now().Unix() - int64(cfg.Age*24*3600)
|
||||
lastProcessed := readCopyMarker(cfg)
|
||||
|
||||
if err := pw.AddJob(*row); err != nil {
|
||||
cclog.Errorf("Parquet retention: add job %d to writer: %v", job.JobID, err)
|
||||
continue
|
||||
}
|
||||
jobs, err := jobRepo.FindJobsBetween(lastProcessed, cutoff, cfg.OmitTagged)
|
||||
if err != nil {
|
||||
cclog.Warnf("Retention copy: error finding jobs: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := pw.Close(); err != nil {
|
||||
cclog.Errorf("Parquet retention: close writer: %v", err)
|
||||
if len(jobs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
ar.CleanUp(jobs)
|
||||
cclog.Infof("Retention copy: processing %d jobs", len(jobs))
|
||||
ar := archive.GetHandle()
|
||||
|
||||
if retention.IncludeDB {
|
||||
cnt, err := jobRepo.DeleteJobsBefore(startTime, retention.OmitTagged)
|
||||
switch cfg.Format {
|
||||
case "parquet":
|
||||
target, err := createParquetTarget(cfg)
|
||||
if err != nil {
|
||||
cclog.Errorf("Parquet retention: delete jobs from db: %v", err)
|
||||
} else {
|
||||
cclog.Infof("Parquet retention: removed %d jobs from db", cnt)
|
||||
cclog.Errorf("Retention copy: create parquet target: %v", err)
|
||||
return
|
||||
}
|
||||
if err = jobRepo.Optimize(); err != nil {
|
||||
cclog.Errorf("Parquet retention: db optimization error: %v", err)
|
||||
if err := transferJobsParquet(jobs, ar, target, maxFileSizeMB); err != nil {
|
||||
cclog.Errorf("Retention copy: parquet transfer: %v", err)
|
||||
return
|
||||
}
|
||||
default: // json
|
||||
dst, err := createTargetBackend(cfg)
|
||||
if err != nil {
|
||||
cclog.Errorf("Retention copy: create target backend: %v", err)
|
||||
return
|
||||
}
|
||||
if err := transferJobsJSON(jobs, ar, dst); err != nil {
|
||||
cclog.Errorf("Retention copy: json transfer: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
writeCopyMarker(cfg, cutoff)
|
||||
}))
|
||||
}
|
||||
|
||||
func RegisterRetentionMoveService(cfg Retention) {
|
||||
cclog.Infof("Register retention move service (format=%s, target=%s)", cfg.Format, cfg.TargetKind)
|
||||
|
||||
maxFileSizeMB := cfg.MaxFileSizeMB
|
||||
if maxFileSizeMB <= 0 {
|
||||
maxFileSizeMB = 512
|
||||
}
|
||||
|
||||
s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(5, 0, 0))),
|
||||
gocron.NewTask(
|
||||
func() {
|
||||
startTime := time.Now().Unix() - int64(cfg.Age*24*3600)
|
||||
jobs, err := jobRepo.FindJobsBetween(0, startTime, cfg.OmitTagged)
|
||||
if err != nil {
|
||||
cclog.Warnf("Retention move: error finding jobs: %v", err)
|
||||
return
|
||||
}
|
||||
if len(jobs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
cclog.Infof("Retention move: processing %d jobs", len(jobs))
|
||||
ar := archive.GetHandle()
|
||||
|
||||
switch cfg.Format {
|
||||
case "parquet":
|
||||
target, err := createParquetTarget(cfg)
|
||||
if err != nil {
|
||||
cclog.Errorf("Retention move: create parquet target: %v", err)
|
||||
return
|
||||
}
|
||||
if err := transferJobsParquet(jobs, ar, target, maxFileSizeMB); err != nil {
|
||||
cclog.Errorf("Retention move: parquet transfer: %v", err)
|
||||
return
|
||||
}
|
||||
default: // json
|
||||
dst, err := createTargetBackend(cfg)
|
||||
if err != nil {
|
||||
cclog.Errorf("Retention move: create target backend: %v", err)
|
||||
return
|
||||
}
|
||||
if err := transferJobsJSON(jobs, ar, dst); err != nil {
|
||||
cclog.Errorf("Retention move: json transfer: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
cleanupAfterTransfer(jobs, startTime, cfg.IncludeDB, cfg.OmitTagged)
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -23,20 +23,20 @@ const (
|
||||
|
||||
// Retention defines the configuration for job retention policies.
|
||||
type Retention struct {
|
||||
Policy string `json:"policy"`
|
||||
Location string `json:"location"`
|
||||
Age int `json:"age"`
|
||||
IncludeDB bool `json:"includeDB"`
|
||||
OmitTagged bool `json:"omitTagged"`
|
||||
TargetKind string `json:"target-kind"`
|
||||
TargetPath string `json:"target-path"`
|
||||
TargetEndpoint string `json:"target-endpoint"`
|
||||
TargetBucket string `json:"target-bucket"`
|
||||
TargetAccessKey string `json:"target-access-key"`
|
||||
TargetSecretKey string `json:"target-secret-key"`
|
||||
TargetRegion string `json:"target-region"`
|
||||
TargetUsePathStyle bool `json:"target-use-path-style"`
|
||||
MaxFileSizeMB int `json:"max-file-size-mb"`
|
||||
Policy string `json:"policy"`
|
||||
Format string `json:"format"`
|
||||
Age int `json:"age"`
|
||||
IncludeDB bool `json:"includeDB"`
|
||||
OmitTagged bool `json:"omitTagged"`
|
||||
TargetKind string `json:"target-kind"`
|
||||
TargetPath string `json:"target-path"`
|
||||
TargetEndpoint string `json:"target-endpoint"`
|
||||
TargetBucket string `json:"target-bucket"`
|
||||
TargetAccessKey string `json:"target-access-key"`
|
||||
TargetSecretKey string `json:"target-secret-key"`
|
||||
TargetRegion string `json:"target-region"`
|
||||
TargetUsePathStyle bool `json:"target-use-path-style"`
|
||||
MaxFileSizeMB int `json:"max-file-size-mb"`
|
||||
}
|
||||
|
||||
// CronFrequency defines the execution intervals for various background workers.
|
||||
@@ -86,18 +86,11 @@ func initArchiveServices(config json.RawMessage) {
|
||||
|
||||
switch cfg.Retention.Policy {
|
||||
case "delete":
|
||||
RegisterRetentionDeleteService(
|
||||
cfg.Retention.Age,
|
||||
cfg.Retention.IncludeDB,
|
||||
cfg.Retention.OmitTagged)
|
||||
RegisterRetentionDeleteService(cfg.Retention)
|
||||
case "copy":
|
||||
RegisterRetentionCopyService(cfg.Retention)
|
||||
case "move":
|
||||
RegisterRetentionMoveService(
|
||||
cfg.Retention.Age,
|
||||
cfg.Retention.IncludeDB,
|
||||
cfg.Retention.Location,
|
||||
cfg.Retention.OmitTagged)
|
||||
case "parquet":
|
||||
RegisterRetentionParquetService(cfg.Retention)
|
||||
RegisterRetentionMoveService(cfg.Retention)
|
||||
}
|
||||
|
||||
if cfg.Compression > 0 {
|
||||
|
||||
@@ -57,7 +57,12 @@ var configSchema = `
|
||||
"policy": {
|
||||
"description": "Retention policy",
|
||||
"type": "string",
|
||||
"enum": ["none", "delete", "move", "parquet"]
|
||||
"enum": ["none", "delete", "copy", "move"]
|
||||
},
|
||||
"format": {
|
||||
"description": "Output format for copy/move policies",
|
||||
"type": "string",
|
||||
"enum": ["json", "parquet"]
|
||||
},
|
||||
"include-db": {
|
||||
"description": "Also remove jobs from database",
|
||||
@@ -67,41 +72,37 @@ var configSchema = `
|
||||
"description": "Act on jobs with startTime older than age (in days)",
|
||||
"type": "integer"
|
||||
},
|
||||
"location": {
|
||||
"description": "The target directory for retention. Only applicable for retention move.",
|
||||
"type": "string"
|
||||
},
|
||||
"target-kind": {
|
||||
"description": "Target storage kind for parquet retention: file or s3",
|
||||
"description": "Target storage kind: file or s3",
|
||||
"type": "string",
|
||||
"enum": ["file", "s3"]
|
||||
},
|
||||
"target-path": {
|
||||
"description": "Target directory path for parquet file storage",
|
||||
"description": "Target directory path for file storage",
|
||||
"type": "string"
|
||||
},
|
||||
"target-endpoint": {
|
||||
"description": "S3 endpoint URL for parquet target",
|
||||
"description": "S3 endpoint URL for target",
|
||||
"type": "string"
|
||||
},
|
||||
"target-bucket": {
|
||||
"description": "S3 bucket name for parquet target",
|
||||
"description": "S3 bucket name for target",
|
||||
"type": "string"
|
||||
},
|
||||
"target-access-key": {
|
||||
"description": "S3 access key for parquet target",
|
||||
"description": "S3 access key for target",
|
||||
"type": "string"
|
||||
},
|
||||
"target-secret-key": {
|
||||
"description": "S3 secret key for parquet target",
|
||||
"description": "S3 secret key for target",
|
||||
"type": "string"
|
||||
},
|
||||
"target-region": {
|
||||
"description": "S3 region for parquet target",
|
||||
"description": "S3 region for target",
|
||||
"type": "string"
|
||||
},
|
||||
"target-use-path-style": {
|
||||
"description": "Use path-style S3 URLs for parquet target",
|
||||
"description": "Use path-style S3 URLs for target",
|
||||
"type": "boolean"
|
||||
},
|
||||
"max-file-size-mb": {
|
||||
|
||||
@@ -36,7 +36,11 @@ func NewFileTarget(path string) (*FileTarget, error) {
|
||||
}
|
||||
|
||||
func (ft *FileTarget) WriteFile(name string, data []byte) error {
|
||||
return os.WriteFile(filepath.Join(ft.path, name), data, 0o640)
|
||||
fullPath := filepath.Join(ft.path, name)
|
||||
if err := os.MkdirAll(filepath.Dir(fullPath), 0o750); err != nil {
|
||||
return fmt.Errorf("create parent directory: %w", err)
|
||||
}
|
||||
return os.WriteFile(fullPath, data, 0o640)
|
||||
}
|
||||
|
||||
// S3TargetConfig holds the configuration for an S3 parquet target.
|
||||
|
||||
@@ -7,10 +7,13 @@ package parquet
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
cclog "github.com/ClusterCockpit/cc-lib/v2/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/v2/schema"
|
||||
pq "github.com/parquet-go/parquet-go"
|
||||
)
|
||||
|
||||
@@ -111,3 +114,68 @@ func estimateRowSize(row *ParquetJobRow) int64 {
|
||||
size += int64(len(row.MetricDataGz))
|
||||
return size
|
||||
}
|
||||
|
||||
// prefixedTarget wraps a ParquetTarget and prepends a path prefix to all file names.
|
||||
type prefixedTarget struct {
|
||||
inner ParquetTarget
|
||||
prefix string
|
||||
}
|
||||
|
||||
func (pt *prefixedTarget) WriteFile(name string, data []byte) error {
|
||||
return pt.inner.WriteFile(path.Join(pt.prefix, name), data)
|
||||
}
|
||||
|
||||
// ClusterAwareParquetWriter organizes Parquet output by cluster.
|
||||
// Each cluster gets its own subdirectory with a cluster.json config file.
|
||||
type ClusterAwareParquetWriter struct {
|
||||
target ParquetTarget
|
||||
maxSizeMB int
|
||||
writers map[string]*ParquetWriter
|
||||
clusterCfgs map[string]*schema.Cluster
|
||||
}
|
||||
|
||||
// NewClusterAwareParquetWriter creates a writer that routes jobs to per-cluster ParquetWriters.
|
||||
func NewClusterAwareParquetWriter(target ParquetTarget, maxSizeMB int) *ClusterAwareParquetWriter {
|
||||
return &ClusterAwareParquetWriter{
|
||||
target: target,
|
||||
maxSizeMB: maxSizeMB,
|
||||
writers: make(map[string]*ParquetWriter),
|
||||
clusterCfgs: make(map[string]*schema.Cluster),
|
||||
}
|
||||
}
|
||||
|
||||
// SetClusterConfig stores a cluster configuration to be written as cluster.json on Close.
|
||||
func (cw *ClusterAwareParquetWriter) SetClusterConfig(name string, cfg *schema.Cluster) {
|
||||
cw.clusterCfgs[name] = cfg
|
||||
}
|
||||
|
||||
// AddJob routes the job row to the appropriate per-cluster writer.
|
||||
func (cw *ClusterAwareParquetWriter) AddJob(row ParquetJobRow) error {
|
||||
cluster := row.Cluster
|
||||
pw, ok := cw.writers[cluster]
|
||||
if !ok {
|
||||
pw = NewParquetWriter(&prefixedTarget{inner: cw.target, prefix: cluster}, cw.maxSizeMB)
|
||||
cw.writers[cluster] = pw
|
||||
}
|
||||
return pw.AddJob(row)
|
||||
}
|
||||
|
||||
// Close writes cluster.json files and flushes all per-cluster writers.
|
||||
func (cw *ClusterAwareParquetWriter) Close() error {
|
||||
for name, cfg := range cw.clusterCfgs {
|
||||
data, err := json.MarshalIndent(cfg, "", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal cluster config %q: %w", name, err)
|
||||
}
|
||||
if err := cw.target.WriteFile(path.Join(name, "cluster.json"), data); err != nil {
|
||||
return fmt.Errorf("write cluster.json for %q: %w", name, err)
|
||||
}
|
||||
}
|
||||
|
||||
for cluster, pw := range cw.writers {
|
||||
if err := pw.Close(); err != nil {
|
||||
return fmt.Errorf("close writer for cluster %q: %w", cluster, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -10,6 +10,9 @@ import (
|
||||
"compress/gzip"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
@@ -222,3 +225,137 @@ func TestFileTarget(t *testing.T) {
|
||||
// Verify file exists and has correct content
|
||||
// (using the target itself is sufficient; we just check no error)
|
||||
}
|
||||
|
||||
func TestFileTargetSubdirectories(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
ft, err := NewFileTarget(dir)
|
||||
if err != nil {
|
||||
t.Fatalf("NewFileTarget: %v", err)
|
||||
}
|
||||
|
||||
testData := []byte("test data in subdir")
|
||||
if err := ft.WriteFile("fritz/cc-archive-2025-01-20-001.parquet", testData); err != nil {
|
||||
t.Fatalf("WriteFile with subdir: %v", err)
|
||||
}
|
||||
|
||||
// Verify file was created in subdirectory
|
||||
content, err := os.ReadFile(filepath.Join(dir, "fritz", "cc-archive-2025-01-20-001.parquet"))
|
||||
if err != nil {
|
||||
t.Fatalf("read file in subdir: %v", err)
|
||||
}
|
||||
if !bytes.Equal(content, testData) {
|
||||
t.Error("file content mismatch")
|
||||
}
|
||||
}
|
||||
|
||||
func makeTestJobForCluster(jobID int64, cluster string) (*schema.Job, *schema.JobData) {
|
||||
meta, data := makeTestJob(jobID)
|
||||
meta.Cluster = cluster
|
||||
return meta, data
|
||||
}
|
||||
|
||||
func TestClusterAwareParquetWriter(t *testing.T) {
|
||||
target := newMemTarget()
|
||||
cw := NewClusterAwareParquetWriter(target, 512)
|
||||
|
||||
// Set cluster configs
|
||||
cw.SetClusterConfig("fritz", &schema.Cluster{Name: "fritz"})
|
||||
cw.SetClusterConfig("alex", &schema.Cluster{Name: "alex"})
|
||||
|
||||
// Add jobs from different clusters
|
||||
for i := int64(0); i < 3; i++ {
|
||||
meta, data := makeTestJobForCluster(i, "fritz")
|
||||
row, err := JobToParquetRow(meta, data)
|
||||
if err != nil {
|
||||
t.Fatalf("convert fritz job %d: %v", i, err)
|
||||
}
|
||||
if err := cw.AddJob(*row); err != nil {
|
||||
t.Fatalf("add fritz job %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
for i := int64(10); i < 12; i++ {
|
||||
meta, data := makeTestJobForCluster(i, "alex")
|
||||
row, err := JobToParquetRow(meta, data)
|
||||
if err != nil {
|
||||
t.Fatalf("convert alex job %d: %v", i, err)
|
||||
}
|
||||
if err := cw.AddJob(*row); err != nil {
|
||||
t.Fatalf("add alex job %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := cw.Close(); err != nil {
|
||||
t.Fatalf("close: %v", err)
|
||||
}
|
||||
|
||||
target.mu.Lock()
|
||||
defer target.mu.Unlock()
|
||||
|
||||
// Check cluster.json files were written
|
||||
if _, ok := target.files["fritz/cluster.json"]; !ok {
|
||||
t.Error("missing fritz/cluster.json")
|
||||
}
|
||||
if _, ok := target.files["alex/cluster.json"]; !ok {
|
||||
t.Error("missing alex/cluster.json")
|
||||
}
|
||||
|
||||
// Verify cluster.json content
|
||||
var clusterCfg schema.Cluster
|
||||
if err := json.Unmarshal(target.files["fritz/cluster.json"], &clusterCfg); err != nil {
|
||||
t.Fatalf("unmarshal fritz cluster.json: %v", err)
|
||||
}
|
||||
if clusterCfg.Name != "fritz" {
|
||||
t.Errorf("fritz cluster name = %q, want %q", clusterCfg.Name, "fritz")
|
||||
}
|
||||
|
||||
// Check parquet files are in cluster subdirectories
|
||||
fritzParquets := 0
|
||||
alexParquets := 0
|
||||
for name := range target.files {
|
||||
if strings.HasPrefix(name, "fritz/") && strings.HasSuffix(name, ".parquet") {
|
||||
fritzParquets++
|
||||
}
|
||||
if strings.HasPrefix(name, "alex/") && strings.HasSuffix(name, ".parquet") {
|
||||
alexParquets++
|
||||
}
|
||||
}
|
||||
if fritzParquets == 0 {
|
||||
t.Error("no parquet files in fritz/")
|
||||
}
|
||||
if alexParquets == 0 {
|
||||
t.Error("no parquet files in alex/")
|
||||
}
|
||||
|
||||
// Verify parquet files are readable and have correct row counts
|
||||
for name, data := range target.files {
|
||||
if !strings.HasSuffix(name, ".parquet") {
|
||||
continue
|
||||
}
|
||||
file := bytes.NewReader(data)
|
||||
pf, err := pq.OpenFile(file, int64(len(data)))
|
||||
if err != nil {
|
||||
t.Errorf("open parquet %s: %v", name, err)
|
||||
continue
|
||||
}
|
||||
if strings.HasPrefix(name, "fritz/") && pf.NumRows() != 3 {
|
||||
t.Errorf("fritz parquet rows = %d, want 3", pf.NumRows())
|
||||
}
|
||||
if strings.HasPrefix(name, "alex/") && pf.NumRows() != 2 {
|
||||
t.Errorf("alex parquet rows = %d, want 2", pf.NumRows())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestClusterAwareParquetWriterEmpty(t *testing.T) {
|
||||
target := newMemTarget()
|
||||
cw := NewClusterAwareParquetWriter(target, 512)
|
||||
|
||||
if err := cw.Close(); err != nil {
|
||||
t.Fatalf("close empty writer: %v", err)
|
||||
}
|
||||
|
||||
if len(target.files) != 0 {
|
||||
t.Errorf("expected no files for empty writer, got %d", len(target.files))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user