mirror of
https://github.com/ClusterCockpit/cc-backend
synced 2025-11-26 03:23:07 +01:00
Add sqlite and s3 job archive backend
Add documentation Extend config
This commit is contained in:
@@ -5,10 +5,843 @@
|
||||
|
||||
package archive
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-backend/internal/config"
|
||||
cclog "github.com/ClusterCockpit/cc-lib/ccLogger"
|
||||
"github.com/ClusterCockpit/cc-lib/schema"
|
||||
"github.com/ClusterCockpit/cc-lib/util"
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
awsconfig "github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
)
|
||||
|
||||
// S3ArchiveConfig holds the configuration for the S3 archive backend.
|
||||
type S3ArchiveConfig struct {
|
||||
Path string `json:"filePath"`
|
||||
Endpoint string `json:"endpoint"` // S3 endpoint URL (optional, for MinIO/localstack)
|
||||
AccessKey string `json:"accessKey"` // AWS access key ID
|
||||
SecretKey string `json:"secretKey"` // AWS secret access key
|
||||
Bucket string `json:"bucket"` // S3 bucket name
|
||||
Region string `json:"region"` // AWS region
|
||||
UsePathStyle bool `json:"usePathStyle"` // Use path-style URLs (required for MinIO)
|
||||
}
|
||||
|
||||
// S3Archive implements ArchiveBackend using AWS S3 or S3-compatible object storage.
|
||||
// Jobs are stored as objects with keys mirroring the filesystem structure.
|
||||
//
|
||||
// Object key structure: <cluster>/<jobid/1000>/<jobid%1000>/<starttime>/meta.json
|
||||
type S3Archive struct {
|
||||
path string
|
||||
client *s3.Client // AWS S3 client
|
||||
bucket string // S3 bucket name
|
||||
clusters []string // List of discovered cluster names
|
||||
}
|
||||
|
||||
// getS3Key generates the S3 object key for a job file
|
||||
func getS3Key(job *schema.Job, file string) string {
|
||||
lvl1 := fmt.Sprintf("%d", job.JobID/1000)
|
||||
lvl2 := fmt.Sprintf("%03d", job.JobID%1000)
|
||||
startTime := strconv.FormatInt(job.StartTime, 10)
|
||||
return fmt.Sprintf("%s/%s/%s/%s/%s", job.Cluster, lvl1, lvl2, startTime, file)
|
||||
}
|
||||
|
||||
// getS3Directory generates the S3 key prefix for a job directory
|
||||
func getS3Directory(job *schema.Job) string {
|
||||
lvl1 := fmt.Sprintf("%d", job.JobID/1000)
|
||||
lvl2 := fmt.Sprintf("%03d", job.JobID%1000)
|
||||
startTime := strconv.FormatInt(job.StartTime, 10)
|
||||
return fmt.Sprintf("%s/%s/%s/%s/", job.Cluster, lvl1, lvl2, startTime)
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) Init(rawConfig json.RawMessage) (uint64, error) {
|
||||
var cfg S3ArchiveConfig
|
||||
if err := json.Unmarshal(rawConfig, &cfg); err != nil {
|
||||
cclog.Warnf("S3Archive Init() > Unmarshal error: %#v", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if cfg.Bucket == "" {
|
||||
err := fmt.Errorf("S3Archive Init(): empty bucket name")
|
||||
cclog.Errorf("S3Archive Init() > config error: %v", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if cfg.Region == "" {
|
||||
cfg.Region = "us-east-1" // Default region
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Create custom AWS config
|
||||
var awsCfg aws.Config
|
||||
var err error
|
||||
|
||||
if cfg.AccessKey != "" && cfg.SecretKey != "" {
|
||||
// Use static credentials
|
||||
customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
|
||||
if cfg.Endpoint != "" {
|
||||
return aws.Endpoint{
|
||||
URL: cfg.Endpoint,
|
||||
HostnameImmutable: cfg.UsePathStyle,
|
||||
Source: aws.EndpointSourceCustom,
|
||||
}, nil
|
||||
}
|
||||
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
|
||||
})
|
||||
|
||||
awsCfg, err = awsconfig.LoadDefaultConfig(ctx,
|
||||
awsconfig.WithRegion(cfg.Region),
|
||||
awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
|
||||
cfg.AccessKey,
|
||||
cfg.SecretKey,
|
||||
"",
|
||||
)),
|
||||
awsconfig.WithEndpointResolverWithOptions(customResolver),
|
||||
)
|
||||
} else {
|
||||
// Use default credential chain
|
||||
awsCfg, err = awsconfig.LoadDefaultConfig(ctx,
|
||||
awsconfig.WithRegion(cfg.Region),
|
||||
)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive Init() > failed to load AWS config: %v", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Create S3 client with path-style option
|
||||
s3a.client = s3.NewFromConfig(awsCfg, func(o *s3.Options) {
|
||||
o.UsePathStyle = cfg.UsePathStyle
|
||||
})
|
||||
s3a.bucket = cfg.Bucket
|
||||
|
||||
// Check if bucket exists and is accessible
|
||||
_, err = s3a.client.HeadBucket(ctx, &s3.HeadBucketInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
})
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive Init() > bucket access error: %v", err)
|
||||
return 0, fmt.Errorf("cannot access S3 bucket '%s': %w", s3a.bucket, err)
|
||||
}
|
||||
|
||||
// Read version.txt from S3
|
||||
versionKey := "version.txt"
|
||||
result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: aws.String(versionKey),
|
||||
})
|
||||
if err != nil {
|
||||
cclog.Warnf("S3Archive Init() > cannot read version.txt: %v", err)
|
||||
return 0, err
|
||||
}
|
||||
defer result.Body.Close()
|
||||
|
||||
versionBytes, err := io.ReadAll(result.Body)
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive Init() > failed to read version.txt: %v", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
version, err := strconv.ParseUint(strings.TrimSuffix(string(versionBytes), "\n"), 10, 64)
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive Init() > version parse error: %v", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if version != Version {
|
||||
return version, fmt.Errorf("unsupported version %d, need %d", version, Version)
|
||||
}
|
||||
|
||||
// Discover clusters by listing top-level prefixes
|
||||
s3a.clusters = []string{}
|
||||
paginator := s3.NewListObjectsV2Paginator(s3a.client, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Delimiter: aws.String("/"),
|
||||
})
|
||||
|
||||
for paginator.HasMorePages() {
|
||||
page, err := paginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive Init() > failed to list clusters: %v", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
for _, prefix := range page.CommonPrefixes {
|
||||
if prefix.Prefix != nil {
|
||||
clusterName := strings.TrimSuffix(*prefix.Prefix, "/")
|
||||
// Filter out non-cluster entries
|
||||
if clusterName != "" && clusterName != "version.txt" {
|
||||
s3a.clusters = append(s3a.clusters, clusterName)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cclog.Infof("S3Archive initialized with bucket '%s', found %d clusters", s3a.bucket, len(s3a.clusters))
|
||||
return version, nil
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) Info() {
|
||||
ctx := context.Background()
|
||||
fmt.Printf("S3 Job archive bucket: %s\n", s3a.bucket)
|
||||
|
||||
ci := make(map[string]*clusterInfo)
|
||||
|
||||
for _, cluster := range s3a.clusters {
|
||||
ci[cluster] = &clusterInfo{dateFirst: time.Now().Unix()}
|
||||
|
||||
// List all jobs for this cluster
|
||||
prefix := cluster + "/"
|
||||
paginator := s3.NewListObjectsV2Paginator(s3a.client, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Prefix: aws.String(prefix),
|
||||
})
|
||||
|
||||
for paginator.HasMorePages() {
|
||||
page, err := paginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
cclog.Fatalf("S3Archive Info() > failed to list objects: %s", err.Error())
|
||||
}
|
||||
|
||||
for _, obj := range page.Contents {
|
||||
if obj.Key != nil && strings.HasSuffix(*obj.Key, "/meta.json") {
|
||||
ci[cluster].numJobs++
|
||||
// Extract starttime from key: cluster/lvl1/lvl2/starttime/meta.json
|
||||
parts := strings.Split(*obj.Key, "/")
|
||||
if len(parts) >= 4 {
|
||||
startTime, err := strconv.ParseInt(parts[3], 10, 64)
|
||||
if err == nil {
|
||||
ci[cluster].dateFirst = util.Min(ci[cluster].dateFirst, startTime)
|
||||
ci[cluster].dateLast = util.Max(ci[cluster].dateLast, startTime)
|
||||
}
|
||||
}
|
||||
if obj.Size != nil {
|
||||
ci[cluster].diskSize += float64(*obj.Size) / (1024 * 1024) // Convert to MB
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cit := clusterInfo{dateFirst: time.Now().Unix()}
|
||||
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', tabwriter.Debug)
|
||||
fmt.Fprintln(w, "cluster\t#jobs\tfrom\tto\tsize (MB)")
|
||||
for cluster, clusterInfo := range ci {
|
||||
fmt.Fprintf(w, "%s\t%d\t%s\t%s\t%.2f\n", cluster,
|
||||
clusterInfo.numJobs,
|
||||
time.Unix(clusterInfo.dateFirst, 0),
|
||||
time.Unix(clusterInfo.dateLast, 0),
|
||||
clusterInfo.diskSize)
|
||||
|
||||
cit.numJobs += clusterInfo.numJobs
|
||||
cit.dateFirst = util.Min(cit.dateFirst, clusterInfo.dateFirst)
|
||||
cit.dateLast = util.Max(cit.dateLast, clusterInfo.dateLast)
|
||||
cit.diskSize += clusterInfo.diskSize
|
||||
}
|
||||
|
||||
fmt.Fprintf(w, "TOTAL\t%d\t%s\t%s\t%.2f\n",
|
||||
cit.numJobs, time.Unix(cit.dateFirst, 0), time.Unix(cit.dateLast, 0), cit.diskSize)
|
||||
w.Flush()
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) Exists(job *schema.Job) bool {
|
||||
ctx := context.Background()
|
||||
key := getS3Key(job, "meta.json")
|
||||
|
||||
_, err := s3a.client.HeadObject(ctx, &s3.HeadObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: aws.String(key),
|
||||
})
|
||||
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) LoadJobMeta(job *schema.Job) (*schema.Job, error) {
|
||||
ctx := context.Background()
|
||||
key := getS3Key(job, "meta.json")
|
||||
|
||||
result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: aws.String(key),
|
||||
})
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive LoadJobMeta() > GetObject error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
defer result.Body.Close()
|
||||
|
||||
b, err := io.ReadAll(result.Body)
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive LoadJobMeta() > read error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if config.Keys.Validate {
|
||||
if err := schema.Validate(schema.Meta, bytes.NewReader(b)); err != nil {
|
||||
return nil, fmt.Errorf("validate job meta: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return DecodeJobMeta(bytes.NewReader(b))
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) LoadJobData(job *schema.Job) (schema.JobData, error) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Try compressed file first
|
||||
keyGz := getS3Key(job, "data.json.gz")
|
||||
result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: aws.String(keyGz),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
// Try uncompressed file
|
||||
key := getS3Key(job, "data.json")
|
||||
result, err = s3a.client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: aws.String(key),
|
||||
})
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive LoadJobData() > GetObject error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
defer result.Body.Close()
|
||||
|
||||
if config.Keys.Validate {
|
||||
b, _ := io.ReadAll(result.Body)
|
||||
if err := schema.Validate(schema.Data, bytes.NewReader(b)); err != nil {
|
||||
return schema.JobData{}, fmt.Errorf("validate job data: %v", err)
|
||||
}
|
||||
return DecodeJobData(bytes.NewReader(b), key)
|
||||
}
|
||||
return DecodeJobData(result.Body, key)
|
||||
}
|
||||
defer result.Body.Close()
|
||||
|
||||
// Decompress
|
||||
r, err := gzip.NewReader(result.Body)
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive LoadJobData() > gzip error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
if config.Keys.Validate {
|
||||
b, _ := io.ReadAll(r)
|
||||
if err := schema.Validate(schema.Data, bytes.NewReader(b)); err != nil {
|
||||
return schema.JobData{}, fmt.Errorf("validate job data: %v", err)
|
||||
}
|
||||
return DecodeJobData(bytes.NewReader(b), keyGz)
|
||||
}
|
||||
return DecodeJobData(r, keyGz)
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) LoadJobStats(job *schema.Job) (schema.ScopedJobStats, error) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Try compressed file first
|
||||
keyGz := getS3Key(job, "data.json.gz")
|
||||
result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: aws.String(keyGz),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
// Try uncompressed file
|
||||
key := getS3Key(job, "data.json")
|
||||
result, err = s3a.client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: aws.String(key),
|
||||
})
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive LoadJobStats() > GetObject error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
defer result.Body.Close()
|
||||
|
||||
if config.Keys.Validate {
|
||||
b, _ := io.ReadAll(result.Body)
|
||||
if err := schema.Validate(schema.Data, bytes.NewReader(b)); err != nil {
|
||||
return nil, fmt.Errorf("validate job data: %v", err)
|
||||
}
|
||||
return DecodeJobStats(bytes.NewReader(b), key)
|
||||
}
|
||||
return DecodeJobStats(result.Body, key)
|
||||
}
|
||||
defer result.Body.Close()
|
||||
|
||||
// Decompress
|
||||
r, err := gzip.NewReader(result.Body)
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive LoadJobStats() > gzip error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
if config.Keys.Validate {
|
||||
b, _ := io.ReadAll(r)
|
||||
if err := schema.Validate(schema.Data, bytes.NewReader(b)); err != nil {
|
||||
return nil, fmt.Errorf("validate job data: %v", err)
|
||||
}
|
||||
return DecodeJobStats(bytes.NewReader(b), keyGz)
|
||||
}
|
||||
return DecodeJobStats(r, keyGz)
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) LoadClusterCfg(name string) (*schema.Cluster, error) {
|
||||
ctx := context.Background()
|
||||
key := fmt.Sprintf("%s/cluster.json", name)
|
||||
|
||||
result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: aws.String(key),
|
||||
})
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive LoadClusterCfg() > GetObject error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
defer result.Body.Close()
|
||||
|
||||
b, err := io.ReadAll(result.Body)
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive LoadClusterCfg() > read error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := schema.Validate(schema.ClusterCfg, bytes.NewReader(b)); err != nil {
|
||||
cclog.Warnf("Validate cluster config: %v\n", err)
|
||||
return &schema.Cluster{}, fmt.Errorf("validate cluster config: %v", err)
|
||||
}
|
||||
|
||||
return DecodeCluster(bytes.NewReader(b))
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) StoreJobMeta(job *schema.Job) error {
|
||||
ctx := context.Background()
|
||||
key := getS3Key(job, "meta.json")
|
||||
|
||||
var buf bytes.Buffer
|
||||
if err := EncodeJobMeta(&buf, job); err != nil {
|
||||
cclog.Error("S3Archive StoreJobMeta() > encoding error")
|
||||
return err
|
||||
}
|
||||
|
||||
_, err := s3a.client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: aws.String(key),
|
||||
Body: bytes.NewReader(buf.Bytes()),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive StoreJobMeta() > PutObject error: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) ImportJob(jobMeta *schema.Job, jobData *schema.JobData) error {
|
||||
ctx := context.Background()
|
||||
|
||||
// Upload meta.json
|
||||
metaKey := getS3Key(jobMeta, "meta.json")
|
||||
var metaBuf bytes.Buffer
|
||||
if err := EncodeJobMeta(&metaBuf, jobMeta); err != nil {
|
||||
cclog.Error("S3Archive ImportJob() > encoding meta error")
|
||||
return err
|
||||
}
|
||||
|
||||
_, err := s3a.client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: aws.String(metaKey),
|
||||
Body: bytes.NewReader(metaBuf.Bytes()),
|
||||
})
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive ImportJob() > PutObject meta error: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Upload data.json
|
||||
dataKey := getS3Key(jobMeta, "data.json")
|
||||
var dataBuf bytes.Buffer
|
||||
if err := EncodeJobData(&dataBuf, jobData); err != nil {
|
||||
cclog.Error("S3Archive ImportJob() > encoding data error")
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = s3a.client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: aws.String(dataKey),
|
||||
Body: bytes.NewReader(dataBuf.Bytes()),
|
||||
})
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive ImportJob() > PutObject data error: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) GetClusters() []string {
|
||||
return s3a.clusters
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) CleanUp(jobs []*schema.Job) {
|
||||
ctx := context.Background()
|
||||
start := time.Now()
|
||||
|
||||
for _, job := range jobs {
|
||||
if job == nil {
|
||||
cclog.Errorf("S3Archive CleanUp() error: job is nil")
|
||||
continue
|
||||
}
|
||||
|
||||
// Delete all files in the job directory
|
||||
prefix := getS3Directory(job)
|
||||
|
||||
paginator := s3.NewListObjectsV2Paginator(s3a.client, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Prefix: aws.String(prefix),
|
||||
})
|
||||
|
||||
for paginator.HasMorePages() {
|
||||
page, err := paginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive CleanUp() > list error: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, obj := range page.Contents {
|
||||
if obj.Key != nil {
|
||||
_, err := s3a.client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: obj.Key,
|
||||
})
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive CleanUp() > delete error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cclog.Infof("Retention Service - Remove %d jobs from S3 in %s", len(jobs), time.Since(start))
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) Move(jobs []*schema.Job, targetPath string) {
|
||||
ctx := context.Background()
|
||||
|
||||
for _, job := range jobs {
|
||||
sourcePrefix := getS3Directory(job)
|
||||
|
||||
// List all objects in source
|
||||
paginator := s3.NewListObjectsV2Paginator(s3a.client, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Prefix: aws.String(sourcePrefix),
|
||||
})
|
||||
|
||||
for paginator.HasMorePages() {
|
||||
page, err := paginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive Move() > list error: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, obj := range page.Contents {
|
||||
if obj.Key == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Compute target key by replacing prefix
|
||||
targetKey := strings.Replace(*obj.Key, sourcePrefix, targetPath+"/", 1)
|
||||
|
||||
// Copy object
|
||||
_, err := s3a.client.CopyObject(ctx, &s3.CopyObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
CopySource: aws.String(fmt.Sprintf("%s/%s", s3a.bucket, *obj.Key)),
|
||||
Key: aws.String(targetKey),
|
||||
})
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive Move() > copy error: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Delete source object
|
||||
_, err = s3a.client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: obj.Key,
|
||||
})
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive Move() > delete error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) Clean(before int64, after int64) {
|
||||
ctx := context.Background()
|
||||
|
||||
if after == 0 {
|
||||
after = math.MaxInt64
|
||||
}
|
||||
|
||||
for _, cluster := range s3a.clusters {
|
||||
prefix := cluster + "/"
|
||||
|
||||
paginator := s3.NewListObjectsV2Paginator(s3a.client, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Prefix: aws.String(prefix),
|
||||
})
|
||||
|
||||
for paginator.HasMorePages() {
|
||||
page, err := paginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
cclog.Fatalf("S3Archive Clean() > list error: %s", err.Error())
|
||||
}
|
||||
|
||||
for _, obj := range page.Contents {
|
||||
if obj.Key == nil || !strings.HasSuffix(*obj.Key, "/meta.json") {
|
||||
continue
|
||||
}
|
||||
|
||||
// Extract starttime from key: cluster/lvl1/lvl2/starttime/meta.json
|
||||
parts := strings.Split(*obj.Key, "/")
|
||||
if len(parts) < 4 {
|
||||
continue
|
||||
}
|
||||
|
||||
startTime, err := strconv.ParseInt(parts[3], 10, 64)
|
||||
if err != nil {
|
||||
cclog.Fatalf("S3Archive Clean() > cannot parse starttime: %s", err.Error())
|
||||
}
|
||||
|
||||
if startTime < before || startTime > after {
|
||||
// Delete entire job directory
|
||||
jobPrefix := strings.Join(parts[:4], "/") + "/"
|
||||
|
||||
jobPaginator := s3.NewListObjectsV2Paginator(s3a.client, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Prefix: aws.String(jobPrefix),
|
||||
})
|
||||
|
||||
for jobPaginator.HasMorePages() {
|
||||
jobPage, err := jobPaginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive Clean() > list job error: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, jobObj := range jobPage.Contents {
|
||||
if jobObj.Key != nil {
|
||||
_, err := s3a.client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: jobObj.Key,
|
||||
})
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive Clean() > delete error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) Compress(jobs []*schema.Job) {
|
||||
ctx := context.Background()
|
||||
var cnt int
|
||||
start := time.Now()
|
||||
|
||||
for _, job := range jobs {
|
||||
dataKey := getS3Key(job, "data.json")
|
||||
|
||||
// Check if uncompressed file exists and get its size
|
||||
headResult, err := s3a.client.HeadObject(ctx, &s3.HeadObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: aws.String(dataKey),
|
||||
})
|
||||
if err != nil {
|
||||
continue // File doesn't exist or error
|
||||
}
|
||||
|
||||
if headResult.ContentLength == nil || *headResult.ContentLength < 2000 {
|
||||
continue // Too small to compress
|
||||
}
|
||||
|
||||
// Download the file
|
||||
result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: aws.String(dataKey),
|
||||
})
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive Compress() > GetObject error: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
data, err := io.ReadAll(result.Body)
|
||||
result.Body.Close()
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive Compress() > read error: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Compress the data
|
||||
var compressedBuf bytes.Buffer
|
||||
gzipWriter := gzip.NewWriter(&compressedBuf)
|
||||
if _, err := gzipWriter.Write(data); err != nil {
|
||||
cclog.Errorf("S3Archive Compress() > gzip write error: %v", err)
|
||||
gzipWriter.Close()
|
||||
continue
|
||||
}
|
||||
gzipWriter.Close()
|
||||
|
||||
// Upload compressed file
|
||||
compressedKey := getS3Key(job, "data.json.gz")
|
||||
_, err = s3a.client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: aws.String(compressedKey),
|
||||
Body: bytes.NewReader(compressedBuf.Bytes()),
|
||||
})
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive Compress() > PutObject error: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Delete uncompressed file
|
||||
_, err = s3a.client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: aws.String(dataKey),
|
||||
})
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive Compress() > delete error: %v", err)
|
||||
}
|
||||
|
||||
cnt++
|
||||
}
|
||||
|
||||
cclog.Infof("Compression Service - %d files in S3 took %s", cnt, time.Since(start))
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) CompressLast(starttime int64) int64 {
|
||||
ctx := context.Background()
|
||||
compressKey := "compress.txt"
|
||||
|
||||
// Try to read existing compress.txt
|
||||
result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: aws.String(compressKey),
|
||||
})
|
||||
|
||||
var last int64
|
||||
if err == nil {
|
||||
b, _ := io.ReadAll(result.Body)
|
||||
result.Body.Close()
|
||||
last, err = strconv.ParseInt(strings.TrimSuffix(string(b), "\n"), 10, 64)
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive CompressLast() > parse error: %v", err)
|
||||
last = starttime
|
||||
}
|
||||
} else {
|
||||
last = starttime
|
||||
}
|
||||
|
||||
cclog.Infof("S3Archive CompressLast() - start %d last %d", starttime, last)
|
||||
|
||||
// Write new timestamp
|
||||
newValue := fmt.Sprintf("%d", starttime)
|
||||
_, err = s3a.client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: aws.String(compressKey),
|
||||
Body: strings.NewReader(newValue),
|
||||
})
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive CompressLast() > PutObject error: %v", err)
|
||||
}
|
||||
|
||||
return last
|
||||
}
|
||||
|
||||
func (s3a *S3Archive) Iter(loadMetricData bool) <-chan JobContainer {
|
||||
ch := make(chan JobContainer)
|
||||
|
||||
go func() {
|
||||
ctx := context.Background()
|
||||
defer close(ch)
|
||||
|
||||
for _, cluster := range s3a.clusters {
|
||||
prefix := cluster + "/"
|
||||
|
||||
paginator := s3.NewListObjectsV2Paginator(s3a.client, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Prefix: aws.String(prefix),
|
||||
})
|
||||
|
||||
for paginator.HasMorePages() {
|
||||
page, err := paginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
cclog.Fatalf("S3Archive Iter() > list error: %s", err.Error())
|
||||
}
|
||||
|
||||
for _, obj := range page.Contents {
|
||||
if obj.Key == nil || !strings.HasSuffix(*obj.Key, "/meta.json") {
|
||||
continue
|
||||
}
|
||||
|
||||
// Load job metadata
|
||||
result, err := s3a.client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(s3a.bucket),
|
||||
Key: obj.Key,
|
||||
})
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive Iter() > GetObject meta error: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
b, err := io.ReadAll(result.Body)
|
||||
result.Body.Close()
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive Iter() > read meta error: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
job, err := DecodeJobMeta(bytes.NewReader(b))
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive Iter() > decode meta error: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if loadMetricData {
|
||||
jobData, err := s3a.LoadJobData(job)
|
||||
if err != nil {
|
||||
cclog.Errorf("S3Archive Iter() > load data error: %v", err)
|
||||
ch <- JobContainer{Meta: job, Data: nil}
|
||||
} else {
|
||||
ch <- JobContainer{Meta: job, Data: &jobData}
|
||||
}
|
||||
} else {
|
||||
ch <- JobContainer{Meta: job, Data: nil}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user